Compare commits

...

43 Commits

Author SHA1 Message Date
Joseph Doherty 2e4ba11a9f Merge remote-tracking branch 'origin/main' into agent-1/issue-17-implement-dashboard-authentication 2026-04-26 18:15:38 -04:00
Joseph Doherty ff86b3f0b0 Implement dashboard authentication 2026-04-26 18:15:22 -04:00
dohertj2 653f17c669 Merge pull request #75 from agent-2/issue-26-implement-register-and-unregister
Issue #26: implement Register and Unregister
2026-04-26 18:13:41 -04:00
Joseph Doherty 556c3bfa83 Implement worker register and unregister 2026-04-26 18:08:45 -04:00
dohertj2 9b3637257c Merge pull request #74 from agent-3/issue-14-implement-event-streaming-and-backpressure
Issue #14: implement event streaming and backpressure
2026-04-26 18:03:32 -04:00
Joseph Doherty 77eac95f33 Issue #14: implement event streaming and backpressure 2026-04-26 17:59:37 -04:00
dohertj2 015fa1f50d Merge pull request #73 from agent-1/issue-15-implement-dashboard-snapshot-service
Issue #15: implement dashboard snapshot service
2026-04-26 17:56:01 -04:00
dohertj2 dede407304 Merge pull request #72 from agent-2/issue-25-implement-sta-command-dispatcher
Issue #25: implement STA command dispatcher
2026-04-26 17:53:32 -04:00
Joseph Doherty 0d96963c99 Merge remote-tracking branch 'origin/main' into agent-1/issue-15-implement-dashboard-snapshot-service 2026-04-26 17:52:58 -04:00
Joseph Doherty 3661420f0a Issue #15: implement dashboard snapshot service 2026-04-26 17:49:59 -04:00
Joseph Doherty 14419853c7 Issue #25: implement sta command dispatcher 2026-04-26 17:49:01 -04:00
dohertj2 a20517f5ad Merge pull request #71 from agent-3/issue-13-implement-public-grpc-service
Issue #13: implement public grpc service
2026-04-26 17:48:35 -04:00
dohertj2 626e7762d9 Merge PR #70: Issue #31 implement MXSTATUS_PROXY and HRESULT conversion
Verified after merging current main with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:43:08 -04:00
Joseph Doherty 8d6d3f6188 Issue #13: implement public grpc service 2026-04-26 17:42:46 -04:00
Joseph Doherty 276288ad87 Merge remote-tracking branch 'origin/main' into agent-2/issue-31-implement-mxstatus-proxy-and-hresult-conversion 2026-04-26 17:39:48 -04:00
dohertj2 76bd3de5a2 Merge PR #69: Issue #24 create MXAccess COM object on STA
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:39:04 -04:00
Joseph Doherty 29455fc1f6 Issue #31: implement mxstatus proxy and hresult conversion 2026-04-26 17:35:30 -04:00
dohertj2 5511609880 Merge PR #68: Issue #12 implement session manager and registry
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:34:19 -04:00
Joseph Doherty 451dccf7e3 Issue #24: create mxaccess com object on sta 2026-04-26 17:34:12 -04:00
dohertj2 cde9c89386 Merge PR #67: Issue #30 implement value conversion
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:30:19 -04:00
Joseph Doherty d496f1fd75 Issue #12: implement session manager and registry 2026-04-26 17:29:47 -04:00
Joseph Doherty 6559672fc1 Issue #30: implement value conversion 2026-04-26 17:26:36 -04:00
dohertj2 97c30b9d00 Merge PR #66: Issue #23 implement STA runtime and message pump
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:23:02 -04:00
dohertj2 603aff7004 Merge PR #65: Issue #22 implement pipe client and frame protocol
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:20:28 -04:00
Joseph Doherty e81682e367 Issue #23: implement sta runtime and message pump 2026-04-26 17:19:00 -04:00
Joseph Doherty d5a982152b Issue #22: implement pipe client and frame protocol 2026-04-26 17:16:49 -04:00
dohertj2 0b0be7098e Merge PR #64: Issue #11 implement gateway WorkerClient
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:14:03 -04:00
Joseph Doherty fce9e99553 Issue #11: implement gateway workerclient 2026-04-26 17:09:51 -04:00
dohertj2 c8fb3e91a3 Merge PR #63: Issue #8 add gRPC authentication and scope authorization
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:06:23 -04:00
dohertj2 8ce327e6f4 Merge PR #62: Issue #7 implement local api key admin cli
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:02:09 -04:00
Joseph Doherty fad0ac9948 Issue #8: add grpc authentication and scope authorization 2026-04-26 17:01:59 -04:00
dohertj2 9cb2f1c5cd Merge PR #61: Issue #21 implement worker bootstrap and options
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:56:52 -04:00
Joseph Doherty da9ffe0e11 Issue #7: implement local api key admin cli 2026-04-26 16:56:12 -04:00
Joseph Doherty 0af1427859 Issue #21: implement worker bootstrap and options 2026-04-26 16:53:06 -04:00
dohertj2 e2b4dfcb32 Merge PR #60: Issue #10 implement worker process launcher
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:50:00 -04:00
dohertj2 3b3e41acf4 Merge PR #59: Issue #6 implement API key hashing and verification
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:46:06 -04:00
Joseph Doherty c1188c6957 Issue #10: implement worker process launcher 2026-04-26 16:45:42 -04:00
dohertj2 4094e64ee0 Merge PR #58: Issue #20 scaffold worker project
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:41:34 -04:00
Joseph Doherty 696be17139 Issue #6: implement api key hashing and verification 2026-04-26 16:40:46 -04:00
Joseph Doherty b42c3c8b3b Issue #20: scaffold worker project 2026-04-26 16:37:23 -04:00
dohertj2 420a813967 Merge PR #56: Issue #5 implement SQLite auth store and migrations
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:34:28 -04:00
Joseph Doherty ec1155de6d Issue #5: implement sqlite auth store and migrations 2026-04-26 16:29:38 -04:00
dohertj2 0c539834dc Merge PR #55: Issue #9 implement worker frame protocol
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:24:38 -04:00
203 changed files with 14875 additions and 35 deletions
+62
View File
@@ -0,0 +1,62 @@
# Worker Process Launcher
The gateway uses `WorkerProcessLauncher` to validate and start one worker
process for a gateway session. The launcher owns process start semantics only;
pipe handshaking and `WorkerReady` validation remain part of the worker client
startup path.
## Launch Inputs
`WorkerProcessLaunchRequest` carries the per-session bootstrap values:
- `SessionId`,
- `PipeName`,
- `ProtocolVersion`,
- `Nonce`,
- optional `PipeReservation` cleanup handle.
The launcher passes `SessionId`, `PipeName`, and `ProtocolVersion` as command
line arguments:
```text
--session-id <sessionId> --pipe-name <pipeName> --protocol-version <version>
```
The launcher sets the nonce through the `MXGATEWAY_WORKER_NONCE` environment
variable. The nonce is not included in `WorkerProcessCommandLine` so logs and
diagnostics can report the launch command without exposing the secret.
## Validation And Cleanup
Before starting the process, the launcher validates that the configured worker
path exists, has a `.exe` extension, contains a valid Windows Portable
Executable header, and matches the configured `RequiredArchitecture`.
After the process starts, `IWorkerStartupProbe` waits for startup readiness.
The default probe only verifies that the worker did not exit immediately. The
worker client replaces this probe when pipe connection, hello, and
`WorkerReady` handling are implemented.
If startup fails or exceeds `WorkerOptions.StartupTimeoutSeconds`, the launcher
kills the worker process tree, disposes the process handle, disposes the
optional pipe reservation, records a worker kill metric, and reports a
`WorkerProcessLaunchException`.
## Verification
Run the focused launcher tests after changing process launch behavior:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerProcessLauncherTests
```
Run the gateway build because the launcher is part of `MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
+13 -7
View File
@@ -257,19 +257,18 @@ Do not show API key secrets or pepper values.
## Authentication And Authorization
Dashboard access should use the same API-key authentication model as gRPC where
Dashboard access uses the same API-key authentication model as gRPC where
practical.
Recommended v1 behavior:
Implemented v1 behavior:
- dashboard disabled by default unless configured,
- when enabled, require API key auth,
- require `admin` scope for dashboard access,
- accept API key through a secure cookie established by a simple login form, or
through reverse-proxy/header configuration for local deployments,
- do not put API keys in query strings.
- accept API key through a secure cookie established by a simple login form,
- do not put API keys in query strings,
- validate anti-forgery tokens for login and logout posts.
Simplest implementation path:
The implementation path is:
1. Add `/dashboard/login`.
2. User submits API key over HTTPS.
@@ -281,6 +280,13 @@ Simplest implementation path:
For local development, allow an explicit `Dashboard:AllowAnonymousLocalhost`
option. It must default to false.
`DashboardAuthenticator` keeps API-key validation outside UI components. It
formats the submitted key as a bearer authorization header for
`IApiKeyVerifier`, rejects non-admin keys when `Dashboard:RequireAdminScope` is
enabled, and creates the dashboard cookie principal without storing raw API key
material. `DashboardAuthorizationHandler` enforces the cookie, admin-scope, and
explicit loopback bypass decisions for all protected dashboard routes.
## Configuration
Suggested configuration:
+149 -14
View File
@@ -64,8 +64,8 @@ MxGateway.Server
Configuration
Grpc
MxAccessGatewayService
RequestReplyMapper
EventMapper
MxAccessGrpcRequestValidator
MxAccessGrpcMapper
Dashboard
Pages
Components
@@ -105,6 +105,15 @@ service MxAccessGateway {
}
```
`MxAccessGatewayService` implements these public RPCs in the gateway process.
It validates public requests with `MxAccessGrpcRequestValidator`, delegates
session lifecycle and command routing to `ISessionManager`, and maps worker
command replies and events through `MxAccessGrpcMapper`. Session lookup,
validation, and worker transport failures become gRPC status errors. MXAccess
method replies that reached the worker remain `MxCommandReply` payloads so
HRESULT values, status arrays, and method-specific reply fields survive
transport boundaries.
Add this later only after the command and event model is stable:
```protobuf
@@ -197,13 +206,23 @@ accounting and a clear fan-out policy.
Behavior:
1. Validate session id and authorize event access.
2. Attach a stream cursor to the session event channel.
3. Send events in worker sequence order.
4. Stop on client cancellation, session close, or session fault.
5. Emit a terminal status when the session faults if gRPC status alone cannot
2. Attach the single active subscriber lease for the session.
3. Read worker events into a bounded public stream queue.
4. Send events in worker sequence order.
5. Stop on client cancellation, session close, or session fault.
6. Emit a terminal status when the session faults if gRPC status alone cannot
preserve the required details.
The gateway must not reorder events from one worker.
`EventStreamService` owns subscriber tracking and public stream backpressure.
The default policy allows one active subscriber per session. A second subscriber
is rejected with `EventSubscriberAlreadyActive`. Stream cancellation releases
the subscriber lease so a later stream can attach to the session.
The gateway must not reorder events from one worker. `EventStreamService` writes
mapped events to a bounded first-in, first-out queue and faults the session with
`EventQueueOverflow` if the queue fills. The gateway does not synthesize
`OperationComplete`; it forwards that family only when the worker reports a
native MXAccess `OperationComplete` event.
## Web Dashboard
@@ -330,6 +349,20 @@ The worker remains authoritative for MXAccess handles. The gateway may keep a
shadow state for diagnostics, but it must not invent, rewrite, or recycle
MXAccess handles.
`SessionManager` owns the current in-memory session registry. It allocates a
session id, creates the worker pipe name and nonce, registers the session before
worker startup, and removes the session if startup fails. A successful
`OpenSession` attaches the ready `IWorkerClient` and transitions the session to
`Ready`.
Only `Ready` sessions accept command and event operations. `CloseSession` is
idempotent for sessions still known to the registry: the first close shuts down
the worker, and later closes return the final `Closed` state. Lease handling is
exposed as a session hook so a monitor can close expired sessions without
embedding lease policy in the worker client. Gateway shutdown walks the
registry, closes each known session, and kills a worker if graceful shutdown
fails.
## Worker Launch
The gateway should launch the worker using explicit configuration:
@@ -360,6 +393,15 @@ Before launch, validate:
- worker file version or product version is acceptable,
- worker is expected to be x86.
`WorkerProcessLauncher` implements the first validation layer now: it resolves
the worker executable path, requires a `.exe`, validates the Windows Portable
Executable header, and verifies the configured processor architecture. It passes
only `--session-id`, `--pipe-name`, and `--protocol-version` on the command
line. The per-session nonce is set through `MXGATEWAY_WORKER_NONCE` so the
command line remains safe to log. Startup failures and startup timeouts kill and
dispose the worker process and the pre-created pipe reservation before the
session manager observes the failure.
## Worker IPC
The gateway creates the pipe server before launching the worker.
@@ -402,7 +444,7 @@ session ids as protocol faults and close the session.
`WorkerClient` is the gateway-side object that owns one worker connection.
Suggested public shape:
Current public shape:
```csharp
public interface IWorkerClient : IAsyncDisposable
@@ -410,6 +452,7 @@ public interface IWorkerClient : IAsyncDisposable
string SessionId { get; }
int? ProcessId { get; }
WorkerClientState State { get; }
DateTimeOffset LastHeartbeatAt { get; }
Task StartAsync(CancellationToken cancellationToken);
Task<WorkerCommandReply> InvokeAsync(
@@ -429,12 +472,17 @@ Internally it owns:
- pipe stream,
- read loop,
- write loop,
- bounded outbound command/control channel,
- outbound command/control channel serialized by the write loop,
- bounded inbound event channel,
- pending command dictionary keyed by correlation id,
- heartbeat monitor,
- terminal fault source.
`StartAsync` sends `GatewayHello`, verifies the `WorkerHello` protocol version
and nonce, waits for `WorkerReady`, and only then exposes `Ready` state. The
read loop starts after readiness so the handshake has a single owner for its
ordered frames.
### Read Loop
The read loop:
@@ -546,7 +594,8 @@ worker MXAccess event
-> worker outbound event queue
-> worker pipe writer
-> gateway read loop
-> session event channel
-> worker client event queue
-> EventStreamService bounded stream queue
-> gRPC StreamEvents
```
@@ -560,13 +609,15 @@ The gateway should record:
Default backpressure policy for parity testing should be fail-fast:
1. If the session event channel fills, fault the session.
1. If the worker client event queue fills, fault the worker client.
2. If the public stream queue fills, fault the gateway session.
2. Preserve the overflow details in logs and metrics.
3. Do not silently drop data-change events.
Do not set a production event-rate target before measurement. Emit event rate,
queue depth, stream send latency, and overflow metrics. Later production modes
may support explicit coalescing by item handle as an opt-in behavior.
Do not set a production event-rate target before measurement. `GatewayMetrics`
records received event counts by family, queue depth, stream disconnects, and
overflow counts. Later production modes may support explicit coalescing by item
handle as an opt-in behavior.
The gateway should not synthesize `OperationComplete` from write completion,
command replies, ASB completion queues, or completion-only status frames. Forward
@@ -589,6 +640,39 @@ The gateway should split the key into a stable key id and secret component,
load the key record by id, hash the presented secret, and compare using a
constant-time comparison.
`ApiKeyParser` accepts only `authorization: Bearer mxgw_<key-id>_<secret>`.
Malformed headers fail before any database lookup. The parsed raw secret is
kept only long enough for `ApiKeySecretHasher` to compute an HMAC-SHA256 hash
using the configured `Authentication:PepperSecretName` lookup in application
configuration. The raw secret is not stored in the auth database, identity
model, logs, or verification result.
`ApiKeyVerifier` loads the stored key record by key id, rejects revoked keys,
hashes the presented secret, and compares the stored and presented hashes with
`CryptographicOperations.FixedTimeEquals`. A successful verification returns an
`ApiKeyIdentity` with key id, key prefix, display name, and scopes. Failure
results distinguish malformed credentials, missing keys, revoked keys, missing
pepper configuration, and hash mismatch for internal authorization handling.
`GatewayGrpcAuthorizationInterceptor` enforces this authentication model for
public gRPC calls. Missing, malformed, revoked, unknown, or mismatched keys fail
with `Unauthenticated`. Authenticated calls missing the scope required by the
RPC fail with `PermissionDenied`. The interceptor applies to unary calls and
server-streaming calls and stores the authenticated `ApiKeyIdentity` in
`IGatewayRequestIdentityAccessor` for the duration of the request handler.
`Authentication:Mode` set to `Disabled` bypasses API-key verification for local
development only.
Dashboard authentication reuses the API-key verifier and scope model. The
dashboard login endpoint accepts the key in a form post, checks `admin` scope
when `Dashboard:RequireAdminScope` is enabled, and signs in with the
`MxGateway.Dashboard` cookie scheme. The cookie is HTTP-only, secure, strict
SameSite, and scoped with the `__Host-MxGatewayDashboard` name. Logout clears
that cookie. Login and logout posts use anti-forgery validation, and dashboard
API keys are not accepted in query strings. `Dashboard:AllowAnonymousLocalhost`
allows only loopback requests to bypass the dashboard cookie requirement and
defaults to `false`.
Recommended scopes:
- `session:open`
@@ -608,10 +692,44 @@ gRPC admin API. It should initialize the auth database, create keys, list keys
without secrets, revoke keys, rotate keys, and print raw secrets only once at
creation.
`MxGateway.Server` exposes local API-key administration as an `apikey`
subcommand before the web host starts:
```bash
MxGateway.Server apikey init-db --sqlite-path C:\ProgramData\MxGateway\gateway-auth.db
MxGateway.Server apikey create-key --key-id operator01 --display-name Operator --scopes session:open,events:read
MxGateway.Server apikey list-keys --json
MxGateway.Server apikey revoke-key --key-id operator01
MxGateway.Server apikey rotate-key --key-id operator01 --json
```
The subcommands accept `--sqlite-path`, `--pepper`, and `--json`. `--pepper`
sets the local `MxGateway:ApiKeyPepper` configuration value for the command
process; deployments should normally provide the pepper through the configured
secret source. `create-key` and `rotate-key` print the full raw API key exactly
once. `list-keys` never prints raw secrets or `secret_hash` values.
SQLite auth storage should use startup migrations with a `schema_version` table.
Migrations should run inside transactions and fail startup if the database
schema is newer than the running binary understands.
The v1 auth store uses `Microsoft.Data.Sqlite` and creates the
`schema_version`, `api_keys`, and `api_key_audit` tables through
`SqliteAuthStoreMigrator`. `AuthStoreMigrationHostedService` runs those
migrations at gateway startup when API-key authentication and
`Authentication:RunMigrationsOnStartup` are enabled. A database with a newer
schema version fails startup instead of being modified by an older gateway
binary.
`IApiKeyStore` reads stored key records and exposes an active-key lookup that
excludes rows with `revoked_utc` set. Hash verification belongs to the API-key
hashing layer, but the store preserves the `secret_hash` bytes, display name,
scopes, timestamps, and revocation state needed by that layer.
`IApiKeyAuditStore` appends audit events to `api_key_audit` and returns recent
events for diagnostics and future administrative tools. Audit records store key
ids and event metadata only; they do not store raw API key secrets.
Commands requiring authorization:
- writes,
@@ -620,6 +738,20 @@ Commands requiring authorization:
- worker shutdown diagnostics,
- metadata queries if they expose sensitive plant structure.
Current gRPC scope mapping:
- `OpenSession` requires `session:open`.
- `CloseSession` requires `session:close`.
- `StreamEvents` and `DrainEvents` require `events:read`.
- read-style MXAccess commands such as `Register`, `AddItem`, `Advise`, and
`Ping` require `invoke:read`.
- `Write` and `Write2` require `invoke:write`.
- `WriteSecured`, `WriteSecured2`, and `AuthenticateUser` require
`invoke:secure`.
- metadata commands such as `ArchestrAUserToId`, `GetSessionState`, and
`GetWorkerInfo` require `metadata:read`.
- `ShutdownWorker` requires `admin`.
### Worker IPC
Named pipes should be local only. Pipe ACLs should restrict access to:
@@ -762,6 +894,9 @@ workers and fake transports.
Focused tests:
- session state transitions,
- gRPC API-key authentication for unary and streaming calls,
- gRPC scope mapping for sessions, invokes, events, metadata, and admin
commands,
- worker startup failures,
- protocol version mismatch,
- malformed frame handling,
+2 -1
View File
@@ -189,6 +189,8 @@ Tests:
Labels: `area:worker`, `type:feature`, `priority:p0`
Status: implemented.
Deliverables:
- `Register`,
@@ -447,4 +449,3 @@ Acceptance criteria:
- each public method has planned parity fixture or documented gap,
- gateway results preserve HRESULT/status/value/event shape.
+85
View File
@@ -26,6 +26,33 @@ Style guides:
- [C# Style Guide](./style-guides/CSharpStyleGuide.md)
- [Protobuf Style Guide](./style-guides/ProtobufStyleGuide.md)
## Build And Test
Build the SDK-style worker project with the .NET SDK MSBuild entry point. The
project targets .NET Framework 4.8, but the SDK resolver comes from the .NET SDK
installation:
```powershell
dotnet msbuild src\MxGateway.Worker\MxGateway.Worker.csproj /restore /p:Configuration=Debug /p:Platform=x86
```
`docs/toolchain-links.md` records the Visual Studio MSBuild executable for
classic .NET Framework and COM interop builds:
```powershell
& "C:\Program Files (x86)\Microsoft Visual Studio\2022\BuildTools\MSBuild\Current\Bin\MSBuild.exe" src\MxGateway.Worker\MxGateway.Worker.csproj /p:Configuration=Debug /p:Platform=x86
```
Run the worker tests with the same platform target:
```powershell
dotnet test src\MxGateway.Worker.Tests\MxGateway.Worker.Tests.csproj -p:Platform=x86
```
The only MXAccess interop reference belongs in `MxGateway.Worker`. Gateway and
test projects may reference the worker project for metadata and scaffold tests,
but they must not reference `ArchestrA.MXAccess.dll` directly.
## Responsibilities
The worker owns:
@@ -87,6 +114,21 @@ Startup sequence:
If validation fails before MXAccess creation, exit quickly with a non-zero exit
code. If MXAccess creation fails, send `WorkerFault` when possible and exit.
The bootstrap layer returns structured exit codes before it creates pipes,
starts the STA, or touches MXAccess:
| Exit code | Name | Meaning |
|-----------|------|---------|
| `0` | `Success` | Required bootstrap options are valid. |
| `1` | `UnexpectedFailure` | A non-bootstrap exception reaches the process boundary. |
| `2` | `InvalidArguments` | Required arguments are missing or unknown arguments are present. |
| `3` | `InvalidProtocolVersion` | `--protocol-version` is not numeric or does not match the supported worker protocol. |
| `4` | `MissingNonce` | `MXGATEWAY_WORKER_NONCE` is absent or empty. |
Bootstrap logs use `WorkerConsoleLogger` key/value output. `WorkerLogRedactor`
redacts fields whose names indicate nonce, secret, password, token,
credential, or API key values before the message is written.
## Internal Components
```text
@@ -208,6 +250,17 @@ The loop should update a heartbeat timestamp after:
- finishing a command,
- processing an MXAccess event.
`StaRuntime` implements this runtime boundary in the worker. It starts one
background thread named `MxGateway.Worker.STA`, sets it to `ApartmentState.STA`,
initializes COM through `StaComApartmentInitializer`, and runs
`StaMessagePump`. Commands are scheduled through `InvokeAsync`; the command
queue signals an `AutoResetEvent` so `MsgWaitForMultipleObjectsEx` can wake the
STA without busy-waiting. `LastActivityUtc` records pump, command, startup, and
shutdown activity so the future heartbeat/watchdog can report whether the STA
is still responsive. Shutdown marks the runtime as closing, wakes the pump,
rejects new commands, cancels queued work, uninitializes COM on the STA, and
waits for the thread to exit.
## COM Creation
The MXAccess analysis source at `C:\Users\dohertj2\Desktop\mxaccess` identifies
@@ -236,6 +289,16 @@ The worker should reference the interop assembly and instantiate
`LMXProxyServerClass` on the dedicated STA thread. Keep the ProgID and assembly
path configurable for diagnostics, but this COM class is the v1 default.
`MxAccessStaSession` owns the initial COM creation path. It starts `StaRuntime`,
creates `LMXProxyServerClass` through `MxAccessComObjectFactory` on the STA,
attaches `MxAccessBaseEventSink`, and returns `WorkerReady` only after those
steps succeed. `MxAccessSession` keeps the raw COM object private, records the
STA managed thread id that created it, detaches the base event sink during
disposal, and releases the COM reference on the STA. After creation,
`MxAccessStaSession` owns a `StaCommandDispatcher` backed by
`MxAccessCommandExecutor`; `DispatchAsync` queues contract commands back to the
same STA instead of exposing the COM object to callers.
Creation rules:
- Create COM object only on the STA.
@@ -253,6 +316,11 @@ If COM creation fails, the worker should send a structured fault with:
- worker process id,
- session id.
`WorkerPipeSession` maps startup exceptions from this path to
`WorkerFaultCategory.MxaccessCreationFailed`, includes the captured HRESULT
when the exception exposes one, and does not send `WorkerReady` after a failed
COM creation attempt.
## Event Sink
The worker must subscribe to every public MXAccess event family:
@@ -349,6 +417,21 @@ Diagnostics:
Implement method-specific dispatch instead of a generic string method invoker.
Parity tests need stable command-specific request and reply shapes.
`MxAccessCommandExecutor` implements the first command pair:
- `Register` calls `LMXProxyServerClass.Register` with the requested client
name and preserves the returned server handle in both `ReturnValue` and
`RegisterReply.ServerHandle`.
- `Unregister` calls `LMXProxyServerClass.Unregister` with the requested server
handle. The reply has no method-specific payload because the public MXAccess
method returns `void`.
Both commands set `Hresult` to `0` only after the COM call returns normally.
COM exceptions flow through `StaCommandDispatcher`, which captures the thrown
HRESULT and converts the reply to `ProtocolStatusCode.MxaccessFailure`.
`MxAccessStaSession.GetRegisteredServerHandlesAsync` returns an STA-read
snapshot of tracked server handles for diagnostics and future cleanup logic.
## Handle Registry
The worker should track MXAccess state for diagnostics and cleanup, while still
@@ -369,6 +452,8 @@ Rules:
- Do not invent handles.
- Do not rewrite handles returned by MXAccess.
- Record server handles only after `Register` succeeds.
- Remove server handles only after `Unregister` succeeds.
- Preserve invalid-handle behavior from MXAccess.
- Preserve cross-server handle behavior from MXAccess.
- Use registry state for cleanup and diagnostics, not semantic correction.
+43 -11
View File
@@ -47,6 +47,8 @@ Detailed follow-up docs:
security, observability, and test strategy.
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
reader/writer and `WorkerEnvelope` validation rules.
- `docs/WorkerProcessLauncher.md` covers worker executable validation, process
launch arguments, nonce handling, and startup cleanup behavior.
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
MXAccess worker instance, including STA ownership, message pumping, COM
lifetime, command dispatch, event sinks, conversion, and shutdown.
@@ -105,6 +107,15 @@ worker, correlation, command, and client identity fields with redaction applied
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
histograms through .NET `Meter` and a snapshot API that dashboard services can
project without binding to a metrics exporter.
`DashboardSnapshotService` projects sessions, workers, metrics, faults, and
effective configuration into immutable DTOs for read-only dashboard rendering.
Dashboard routes use the same API-key verifier as gRPC. `/dashboard/login`
accepts the API key in a form body, validates the configured `admin` scope,
and issues an HTTP-only secure cookie for subsequent dashboard requests.
`/dashboard/logout` clears that cookie. Login and logout posts validate
anti-forgery tokens, and API keys are never accepted through query strings.
`Dashboard:AllowAnonymousLocalhost` can bypass the cookie requirement for
loopback requests only when explicitly enabled.
### Worker Process
@@ -516,11 +527,7 @@ Worker policy:
- bounded outbound event channel,
- never block MXAccess event handler on pipe writes,
- if the outbound channel is full, apply configured policy:
- disconnect session,
- drop oldest low-priority data-change events,
- coalesce data changes by item handle,
- or block briefly then fault.
- fail the worker session when the outbound channel is full.
For full parity testing, default should be fail-fast rather than silent drop.
For production high-rate telemetry, add explicit coalescing modes.
@@ -529,9 +536,15 @@ Gateway policy:
- one event sequencer per session,
- preserve per-session event order,
- support multiple client event subscribers only if explicitly required,
- apply backpressure from slow gRPC streams,
- disconnect or coalesce according to client-selected mode.
- allow one active client event subscriber per session,
- reject a second subscriber with a clear session error,
- use a bounded `EventStreamService` queue between worker events and gRPC
writes,
- fault the session when the bounded stream queue overflows,
- detach the subscriber when the stream is canceled.
The gateway forwards only events reported by the worker. It does not synthesize
`OperationComplete` from write completion, command replies, or status frames.
## Isolation And Fault Handling
@@ -564,9 +577,13 @@ Because each client owns one worker, a crash or leak affects only that session.
External gateway:
- use TLS for remote gRPC if crossing machine boundaries,
- authenticate clients with Windows auth, mTLS, or a deployment-specific token,
- authorize access to commands that can write, authenticate users, or alter
runtime state.
- authenticate v1 gRPC clients with `authorization: Bearer
mxgw_<key-id>_<secret>` API-key metadata,
- reject missing or invalid API keys with gRPC `Unauthenticated`,
- reject valid keys that lack the required session, invoke, event, metadata, or
admin scope with gRPC `PermissionDenied`,
- authorize access to commands that can write, authenticate users, expose
metadata, stream events, or alter runtime state.
Internal worker IPC:
@@ -793,6 +810,12 @@ Core operations:
- track worker state,
- close or kill worker.
The gateway implementation keeps sessions in an in-memory `SessionRegistry`
keyed by session id. `SessionManager` owns the state machine, creates
per-session pipe names and nonces, starts the worker through the worker-client
factory, gates commands to `Ready` sessions, exposes lease-close hooks, and
cleans up workers during gateway shutdown.
State machine:
```text
@@ -840,6 +863,15 @@ The gRPC layer should be thin:
Avoid embedding MXAccess-specific business logic in gRPC handlers. Keep the
translation code testable.
The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service
implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by
validating public requests, delegating session work to `ISessionManager`, and
using explicit mapper code for public-to-worker commands and worker replies.
`StreamEvents` delegates subscriber ownership, ordering, and backpressure to
`EventStreamService`. Missing sessions and transport failures return gRPC
status errors; worker command replies preserve MXAccess HRESULT and status
details in the public reply.
## C# Worker Versus C++ Worker
Start with a C# .NET Framework 4.8 x86 worker.
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<TargetFrameworks>net10.0;net48</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
@@ -17,6 +17,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="6.1.2" />
</ItemGroup>
</Project>
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Dashboard;
public static class DashboardAuthenticationDefaults
{
public const string AuthenticationScheme = "MxGateway.Dashboard";
public const string AuthorizationPolicy = "MxGateway.Dashboard";
public const string ScopeClaimType = "scope";
public const string KeyPrefixClaimType = "mxgateway:key_prefix";
public const string CookieName = "__Host-MxGatewayDashboard";
}
@@ -0,0 +1,19 @@
using System.Security.Claims;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardAuthenticationResult(
bool Succeeded,
ClaimsPrincipal? Principal,
string? FailureMessage)
{
public static DashboardAuthenticationResult Success(ClaimsPrincipal principal)
{
return new DashboardAuthenticationResult(true, principal, null);
}
public static DashboardAuthenticationResult Fail(string failureMessage)
{
return new DashboardAuthenticationResult(false, null, failureMessage);
}
}
@@ -0,0 +1,81 @@
using System.Security.Claims;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardAuthenticator(
IApiKeyVerifier apiKeyVerifier,
IOptions<GatewayOptions> options) : IDashboardAuthenticator
{
private const string GenericFailureMessage = "The API key is invalid or is not authorized for dashboard access.";
public async Task<DashboardAuthenticationResult> AuthenticateAsync(
string? apiKey,
CancellationToken cancellationToken)
{
if (options.Value.Authentication.Mode == AuthenticationMode.Disabled)
{
return DashboardAuthenticationResult.Success(CreatePrincipal(new ApiKeyIdentity(
KeyId: "authentication-disabled",
KeyPrefix: "authentication-disabled",
DisplayName: "Authentication Disabled",
Scopes: new HashSet<string>([GatewayScopes.Admin], StringComparer.Ordinal))));
}
if (string.IsNullOrWhiteSpace(apiKey))
{
return DashboardAuthenticationResult.Fail(GenericFailureMessage);
}
ApiKeyVerificationResult verificationResult = await apiKeyVerifier
.VerifyAsync(FormatAuthorizationHeader(apiKey), cancellationToken)
.ConfigureAwait(false);
if (!verificationResult.Succeeded || verificationResult.Identity is null)
{
return DashboardAuthenticationResult.Fail(GenericFailureMessage);
}
if (options.Value.Dashboard.RequireAdminScope
&& !verificationResult.Identity.Scopes.Contains(GatewayScopes.Admin))
{
return DashboardAuthenticationResult.Fail(GenericFailureMessage);
}
return DashboardAuthenticationResult.Success(CreatePrincipal(verificationResult.Identity));
}
private static string FormatAuthorizationHeader(string apiKey)
{
string trimmedApiKey = apiKey.Trim();
return trimmedApiKey.StartsWith("Bearer ", StringComparison.OrdinalIgnoreCase)
? trimmedApiKey
: $"Bearer {trimmedApiKey}";
}
private static ClaimsPrincipal CreatePrincipal(ApiKeyIdentity identity)
{
List<Claim> claims =
[
new Claim(ClaimTypes.NameIdentifier, identity.KeyId),
new Claim(ClaimTypes.Name, identity.DisplayName),
new Claim(DashboardAuthenticationDefaults.KeyPrefixClaimType, identity.KeyPrefix)
];
claims.AddRange(identity.Scopes.Select(scope => new Claim(
DashboardAuthenticationDefaults.ScopeClaimType,
scope)));
ClaimsIdentity claimsIdentity = new(
claims,
DashboardAuthenticationDefaults.AuthenticationScheme,
ClaimTypes.Name,
DashboardAuthenticationDefaults.ScopeClaimType);
return new ClaimsPrincipal(claimsIdentity);
}
}
@@ -0,0 +1,59 @@
using System.Net;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardAuthorizationHandler(
IHttpContextAccessor httpContextAccessor,
IOptions<GatewayOptions> options) : AuthorizationHandler<DashboardAuthorizationRequirement>
{
protected override Task HandleRequirementAsync(
AuthorizationHandlerContext context,
DashboardAuthorizationRequirement requirement)
{
GatewayOptions gatewayOptions = options.Value;
if (gatewayOptions.Authentication.Mode == AuthenticationMode.Disabled)
{
context.Succeed(requirement);
return Task.CompletedTask;
}
if (gatewayOptions.Dashboard.AllowAnonymousLocalhost && IsLoopbackRequest())
{
context.Succeed(requirement);
return Task.CompletedTask;
}
if (context.User.Identity?.IsAuthenticated != true)
{
return Task.CompletedTask;
}
if (!gatewayOptions.Dashboard.RequireAdminScope || HasAdminScope(context))
{
context.Succeed(requirement);
}
return Task.CompletedTask;
}
private bool IsLoopbackRequest()
{
IPAddress? remoteAddress = httpContextAccessor.HttpContext?.Connection.RemoteIpAddress;
return remoteAddress is not null && IPAddress.IsLoopback(remoteAddress);
}
private static bool HasAdminScope(AuthorizationHandlerContext context)
{
return context.User.HasClaim(
DashboardAuthenticationDefaults.ScopeClaimType,
GatewayScopes.Admin);
}
}
@@ -0,0 +1,5 @@
using Microsoft.AspNetCore.Authorization;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardAuthorizationRequirement : IAuthorizationRequirement;
@@ -0,0 +1,217 @@
using System.Text.Encodings.Web;
using Microsoft.AspNetCore.Antiforgery;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Http.HttpResults;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public static class DashboardEndpointRouteBuilderExtensions
{
public static IEndpointRouteBuilder MapGatewayDashboard(this IEndpointRouteBuilder endpoints)
{
IConfiguration configuration = endpoints.ServiceProvider.GetRequiredService<IConfiguration>();
IConfigurationSection dashboardSection = configuration
.GetSection($"{GatewayOptions.SectionName}:Dashboard");
if (bool.TryParse(dashboardSection["Enabled"], out bool enabled) && !enabled)
{
return endpoints;
}
string pathBase = NormalizePathBase(dashboardSection["PathBase"] ?? new DashboardOptions().PathBase);
RouteGroupBuilder dashboard = endpoints.MapGroup(pathBase);
dashboard.MapGet(
"/",
(HttpContext httpContext, IAntiforgery antiforgery, IDashboardSnapshotService snapshotService) =>
GetDashboardHomeAsync(httpContext, antiforgery, snapshotService, pathBase))
.RequireAuthorization(DashboardAuthenticationDefaults.AuthorizationPolicy)
.WithName("DashboardHome");
dashboard.MapGet(
"/login",
(HttpContext httpContext, IAntiforgery antiforgery) => GetLoginAsync(httpContext, antiforgery, pathBase))
.AllowAnonymous()
.WithName("DashboardLogin");
dashboard.MapPost(
"/login",
(HttpContext httpContext, IAntiforgery antiforgery, IDashboardAuthenticator authenticator) =>
PostLoginAsync(httpContext, antiforgery, authenticator, pathBase))
.AllowAnonymous()
.WithName("DashboardLoginPost");
dashboard.MapPost(
"/logout",
(HttpContext httpContext, IAntiforgery antiforgery) => PostLogoutAsync(httpContext, antiforgery, pathBase))
.RequireAuthorization(DashboardAuthenticationDefaults.AuthorizationPolicy)
.WithName("DashboardLogout");
dashboard.MapGet("/denied", () => Results.Content(
RenderPage("Access denied", "<p>The signed-in API key is not authorized for dashboard access.</p>"),
"text/html"))
.AllowAnonymous()
.WithName("DashboardAccessDenied");
return endpoints;
}
private static ContentHttpResult GetDashboardHomeAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
IDashboardSnapshotService snapshotService,
string pathBase)
{
AntiforgeryTokenSet tokens = antiforgery.GetAndStoreTokens(httpContext);
DashboardSnapshot snapshot = snapshotService.GetSnapshot();
string requestToken = tokens.RequestToken ?? string.Empty;
string body = $"""
<form method="post" action="{HtmlEncoder.Default.Encode(pathBase + "/logout")}" class="mb-3">
<input name="{tokens.FormFieldName}" type="hidden" value="{HtmlEncoder.Default.Encode(requestToken)}" />
<button type="submit">Sign out</button>
</form>
<dl>
<dt>Open sessions</dt>
<dd>{snapshot.Sessions.Count}</dd>
<dt>Workers</dt>
<dd>{snapshot.Workers.Count}</dd>
<dt>Faults</dt>
<dd>{snapshot.Faults.Count}</dd>
</dl>
""";
return TypedResults.Content(RenderPage("MXAccess Gateway Dashboard", body), "text/html");
}
private static Task<ContentHttpResult> GetLoginAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
string pathBase)
{
string returnUrl = SanitizeReturnUrl(
httpContext.Request.Query["returnUrl"].ToString(),
pathBase);
return Task.FromResult(TypedResults.Content(
RenderLoginPage(httpContext, antiforgery, returnUrl, pathBase, failureMessage: null),
"text/html"));
}
private static async Task<IResult> PostLoginAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
IDashboardAuthenticator authenticator,
string pathBase)
{
await antiforgery.ValidateRequestAsync(httpContext).ConfigureAwait(false);
IFormCollection form = await httpContext.Request
.ReadFormAsync(httpContext.RequestAborted)
.ConfigureAwait(false);
string returnUrl = SanitizeReturnUrl(
form["returnUrl"].ToString(),
pathBase);
DashboardAuthenticationResult result = await authenticator
.AuthenticateAsync(form["apiKey"].ToString(), httpContext.RequestAborted)
.ConfigureAwait(false);
if (!result.Succeeded || result.Principal is null)
{
return TypedResults.Content(
RenderLoginPage(httpContext, antiforgery, returnUrl, pathBase, result.FailureMessage),
"text/html",
statusCode: StatusCodes.Status401Unauthorized);
}
await httpContext
.SignInAsync(DashboardAuthenticationDefaults.AuthenticationScheme, result.Principal)
.ConfigureAwait(false);
return Results.LocalRedirect(returnUrl);
}
private static async Task<IResult> PostLogoutAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
string pathBase)
{
await antiforgery.ValidateRequestAsync(httpContext).ConfigureAwait(false);
await httpContext
.SignOutAsync(DashboardAuthenticationDefaults.AuthenticationScheme)
.ConfigureAwait(false);
return Results.LocalRedirect($"{pathBase}/login");
}
private static string RenderLoginPage(
HttpContext httpContext,
IAntiforgery antiforgery,
string returnUrl,
string pathBase,
string? failureMessage)
{
AntiforgeryTokenSet tokens = antiforgery.GetAndStoreTokens(httpContext);
string requestToken = tokens.RequestToken ?? string.Empty;
string alert = string.IsNullOrWhiteSpace(failureMessage)
? string.Empty
: $"<p role=\"alert\">{HtmlEncoder.Default.Encode(failureMessage)}</p>";
string body = $"""
{alert}
<form method="post" action="{HtmlEncoder.Default.Encode(pathBase + "/login")}">
<input name="{tokens.FormFieldName}" type="hidden" value="{HtmlEncoder.Default.Encode(requestToken)}" />
<input name="returnUrl" type="hidden" value="{HtmlEncoder.Default.Encode(returnUrl)}" />
<label for="apiKey">API key</label>
<input id="apiKey" name="apiKey" type="password" autocomplete="off" />
<button type="submit">Sign in</button>
</form>
""";
return RenderPage("Dashboard Sign In", body);
}
private static string RenderPage(string title, string body)
{
return $"""
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>{HtmlEncoder.Default.Encode(title)}</title>
</head>
<body>
<main>
<h1>{HtmlEncoder.Default.Encode(title)}</h1>
{body}
</main>
</body>
</html>
""";
}
private static string NormalizePathBase(string pathBase)
{
string normalized = pathBase.TrimEnd('/');
return string.IsNullOrWhiteSpace(normalized) || !normalized.StartsWith("/", StringComparison.Ordinal)
? "/dashboard"
: normalized;
}
private static string SanitizeReturnUrl(string? returnUrl, string pathBase)
{
if (string.IsNullOrWhiteSpace(returnUrl)
|| !returnUrl.StartsWith("/", StringComparison.Ordinal)
|| returnUrl.StartsWith("//", StringComparison.Ordinal)
|| !returnUrl.StartsWith(pathBase, StringComparison.OrdinalIgnoreCase)
|| Uri.TryCreate(returnUrl, UriKind.Absolute, out _))
{
return pathBase;
}
return returnUrl;
}
}
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Dashboard;
public sealed record DashboardFaultSummary(
string Source,
string? SessionId,
int? WorkerProcessId,
string State,
string Message,
DateTimeOffset ObservedAt);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Dashboard;
public sealed record DashboardMetricSummary(
string Name,
long Value,
string? Dimension = null);
@@ -0,0 +1,34 @@
using MxGateway.Server.Diagnostics;
namespace MxGateway.Server.Dashboard;
internal static class DashboardRedactor
{
private static readonly string[] SensitiveTextMarkers =
[
"apikey",
"api_key",
"authorization",
"credential",
"password",
"secret",
"token",
];
public static string? Redact(string? value)
{
if (string.IsNullOrWhiteSpace(value))
{
return value;
}
if (value.Contains("mxgw_", StringComparison.OrdinalIgnoreCase))
{
return GatewayLogRedactor.RedactClientIdentity(value);
}
return SensitiveTextMarkers.Any(marker => value.Contains(marker, StringComparison.OrdinalIgnoreCase))
? GatewayLogRedactor.RedactedValue
: value;
}
}
@@ -0,0 +1,53 @@
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public static class DashboardServiceCollectionExtensions
{
public static IServiceCollection AddGatewayDashboard(this IServiceCollection services)
{
services.AddSingleton<IDashboardSnapshotService, DashboardSnapshotService>();
services.AddSingleton<IDashboardAuthenticator, DashboardAuthenticator>();
services.AddHttpContextAccessor();
services.AddAntiforgery();
services
.AddAuthentication(DashboardAuthenticationDefaults.AuthenticationScheme)
.AddCookie(DashboardAuthenticationDefaults.AuthenticationScheme);
services.AddOptions<CookieAuthenticationOptions>(DashboardAuthenticationDefaults.AuthenticationScheme)
.Configure<IOptions<GatewayOptions>>(ConfigureCookieOptions);
services.AddAuthorization(options =>
{
options.AddPolicy(
DashboardAuthenticationDefaults.AuthorizationPolicy,
policy => policy.AddRequirements(new DashboardAuthorizationRequirement()));
});
services.AddSingleton<IAuthorizationHandler, DashboardAuthorizationHandler>();
return services;
}
private static void ConfigureCookieOptions(
CookieAuthenticationOptions cookieOptions,
IOptions<GatewayOptions> gatewayOptions)
{
string pathBase = gatewayOptions.Value.Dashboard.PathBase.TrimEnd('/');
if (string.IsNullOrWhiteSpace(pathBase))
{
pathBase = "/dashboard";
}
cookieOptions.Cookie.Name = DashboardAuthenticationDefaults.CookieName;
cookieOptions.Cookie.HttpOnly = true;
cookieOptions.Cookie.SecurePolicy = CookieSecurePolicy.Always;
cookieOptions.Cookie.SameSite = SameSiteMode.Strict;
cookieOptions.Cookie.Path = "/";
cookieOptions.LoginPath = $"{pathBase}/login";
cookieOptions.LogoutPath = $"{pathBase}/logout";
cookieOptions.AccessDeniedPath = $"{pathBase}/denied";
cookieOptions.ExpireTimeSpan = TimeSpan.FromHours(8);
cookieOptions.SlidingExpiration = true;
}
}
@@ -0,0 +1,19 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardSessionSummary(
string SessionId,
string BackendName,
SessionState State,
string? ClientIdentity,
string? ClientSessionName,
string? ClientCorrelationId,
DateTimeOffset OpenedAt,
DateTimeOffset LastClientActivityAt,
DateTimeOffset? LeaseExpiresAt,
int? WorkerProcessId,
WorkerClientState? WorkerState,
DateTimeOffset? LastWorkerHeartbeatAt,
string? LastFault);
@@ -0,0 +1,15 @@
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardSnapshot(
DateTimeOffset GeneratedAt,
DateTimeOffset GatewayStartedAt,
TimeSpan GatewayUptime,
string GatewayStatus,
string GatewayVersion,
IReadOnlyList<DashboardSessionSummary> Sessions,
IReadOnlyList<DashboardWorkerSummary> Workers,
IReadOnlyList<DashboardMetricSummary> Metrics,
IReadOnlyList<DashboardFaultSummary> Faults,
EffectiveGatewayConfiguration Configuration);
@@ -0,0 +1,196 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardSnapshotService : IDashboardSnapshotService
{
private const string HealthyStatus = "Healthy";
private readonly ISessionRegistry _sessionRegistry;
private readonly GatewayMetrics _metrics;
private readonly IGatewayConfigurationProvider _configurationProvider;
private readonly TimeProvider _timeProvider;
private readonly DateTimeOffset _gatewayStartedAt;
private readonly TimeSpan _snapshotInterval;
private readonly int _recentFaultLimit;
private readonly int _recentSessionLimit;
public DashboardSnapshotService(
ISessionRegistry sessionRegistry,
GatewayMetrics metrics,
IGatewayConfigurationProvider configurationProvider,
IOptions<GatewayOptions> options,
TimeProvider? timeProvider = null)
{
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
ArgumentNullException.ThrowIfNull(options);
_timeProvider = timeProvider ?? TimeProvider.System;
_gatewayStartedAt = _timeProvider.GetUtcNow();
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
}
public DashboardSnapshot GetSnapshot()
{
DateTimeOffset generatedAt = _timeProvider.GetUtcNow();
IReadOnlyList<GatewaySession> sessions = _sessionRegistry.Snapshot()
.OrderByDescending(session => session.OpenedAt)
.ToArray();
IReadOnlyList<DashboardSessionSummary> sessionSummaries = sessions
.Take(ResolveLimit(_recentSessionLimit))
.Select(CreateSessionSummary)
.ToArray();
IReadOnlyList<DashboardWorkerSummary> workerSummaries = sessions
.Where(session => session.WorkerClient is not null)
.Select(CreateWorkerSummary)
.ToArray();
GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot();
return new DashboardSnapshot(
GeneratedAt: generatedAt,
GatewayStartedAt: _gatewayStartedAt,
GatewayUptime: generatedAt - _gatewayStartedAt,
GatewayStatus: HealthyStatus,
GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown",
Sessions: sessionSummaries,
Workers: workerSummaries,
Metrics: CreateMetricSummaries(metricsSnapshot),
Faults: CreateFaultSummaries(sessions, generatedAt),
Configuration: _configurationProvider.GetEffectiveConfiguration());
}
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
yield break;
}
yield return GetSnapshot();
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
while (true)
{
bool hasNext;
try
{
hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
yield break;
}
if (!hasNext)
{
yield break;
}
yield return GetSnapshot();
}
}
private static DashboardSessionSummary CreateSessionSummary(GatewaySession session)
{
IWorkerClient? workerClient = session.WorkerClient;
return new DashboardSessionSummary(
SessionId: session.SessionId,
BackendName: session.BackendName,
State: session.State,
ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity),
ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName),
ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId),
OpenedAt: session.OpenedAt,
LastClientActivityAt: session.LastClientActivityAt,
LeaseExpiresAt: session.LeaseExpiresAt,
WorkerProcessId: workerClient?.ProcessId,
WorkerState: workerClient?.State,
LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt,
LastFault: DashboardRedactor.Redact(session.FinalFault));
}
private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session)
{
IWorkerClient workerClient = session.WorkerClient!;
return new DashboardWorkerSummary(
SessionId: session.SessionId,
ProcessId: workerClient.ProcessId,
State: workerClient.State,
LastHeartbeatAt: workerClient.LastHeartbeatAt,
LastFault: DashboardRedactor.Redact(session.FinalFault));
}
private static IReadOnlyList<DashboardMetricSummary> CreateMetricSummaries(GatewayMetricsSnapshot snapshot)
{
List<DashboardMetricSummary> metrics =
[
new("mxgateway.sessions.open", snapshot.OpenSessions),
new("mxgateway.workers.running", snapshot.WorkersRunning),
new("mxgateway.events.queue.depth", snapshot.EventQueueDepth),
new("mxgateway.sessions.opened", snapshot.SessionsOpened),
new("mxgateway.sessions.closed", snapshot.SessionsClosed),
new("mxgateway.commands.started", snapshot.CommandsStarted),
new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded),
new("mxgateway.commands.failed", snapshot.CommandsFailed),
new("mxgateway.events.received", snapshot.EventsReceived),
new("mxgateway.queues.overflows", snapshot.QueueOverflows),
new("mxgateway.faults", snapshot.Faults),
new("mxgateway.workers.killed", snapshot.WorkerKills),
new("mxgateway.workers.exited", snapshot.WorkerExits),
new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures),
new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects),
];
metrics.AddRange(snapshot.CommandFailuresByMethod
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key)));
metrics.AddRange(snapshot.EventsByFamily
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key)));
return metrics;
}
private IReadOnlyList<DashboardFaultSummary> CreateFaultSummaries(
IReadOnlyList<GatewaySession> sessions,
DateTimeOffset generatedAt)
{
return sessions
.Where(HasFault)
.Take(ResolveLimit(_recentFaultLimit))
.Select(session => new DashboardFaultSummary(
Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session",
SessionId: session.SessionId,
WorkerProcessId: session.WorkerProcessId,
State: session.WorkerClient?.State == WorkerClientState.Faulted
? WorkerClientState.Faulted.ToString()
: session.State.ToString(),
Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted",
ObservedAt: generatedAt))
.ToArray();
}
private static bool HasFault(GatewaySession session)
{
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
|| session.WorkerClient?.State == WorkerClientState.Faulted
|| !string.IsNullOrWhiteSpace(session.FinalFault);
}
private static int ResolveLimit(int configuredLimit)
{
return configuredLimit < 0 ? 0 : configuredLimit;
}
}
@@ -0,0 +1,10 @@
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardWorkerSummary(
string SessionId,
int? ProcessId,
WorkerClientState State,
DateTimeOffset LastHeartbeatAt,
string? LastFault);
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Dashboard;
public interface IDashboardAuthenticator
{
Task<DashboardAuthenticationResult> AuthenticateAsync(
string? apiKey,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Dashboard;
public interface IDashboardSnapshotService
{
DashboardSnapshot GetSnapshot();
IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(CancellationToken cancellationToken);
}
@@ -1,7 +1,13 @@
using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Diagnostics;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server;
@@ -13,6 +19,8 @@ public static class GatewayApplication
WebApplication app = builder.Build();
app.UseGatewayRequestLoggingScope();
app.UseAuthentication();
app.UseAuthorization();
app.MapGatewayEndpoints();
return app;
@@ -23,8 +31,16 @@ public static class GatewayApplication
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddGatewayConfiguration();
builder.Services.AddSqliteAuthStore();
builder.Services.AddGatewayGrpcAuthorization();
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
builder.Services.AddSingleton<MxAccessGrpcMapper>();
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
builder.Services.AddSingleton<IEventStreamService, EventStreamService>();
builder.Services.AddWorkerProcessLauncher();
builder.Services.AddGatewaySessions();
builder.Services.AddGatewayDashboard();
return builder;
}
@@ -41,6 +57,9 @@ public static class GatewayApplication
WorkerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion)))
.WithName("LiveHealth");
endpoints.MapGrpcService<MxAccessGatewayService>();
endpoints.MapGatewayDashboard();
return endpoints;
}
}
@@ -0,0 +1,140 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Grpc;
public sealed class EventStreamService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
MxAccessGrpcMapper mapper,
GatewayMetrics metrics,
ILogger<EventStreamService> logger) : IEventStreamService
{
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {request.SessionId} was not found.");
}
using IDisposable subscriber = session.AttachEventSubscriber(
options.Value.Sessions.AllowMultipleEventSubscribers);
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
int streamQueueDepth = 0;
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
Task producerTask = ProduceEventsAsync(
session,
request.AfterWorkerSequence,
eventQueue.Writer,
() =>
{
int depth = Interlocked.Increment(ref streamQueueDepth);
metrics.SetEventQueueDepth(depth);
},
streamCts.Token);
try
{
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth));
metrics.SetEventQueueDepth(depth);
yield return mxEvent;
}
await producerTask.ConfigureAwait(false);
}
finally
{
await streamCts.CancelAsync().ConfigureAwait(false);
subscriber.Dispose();
metrics.StreamDisconnected("Detached");
try
{
await producerTask.ConfigureAwait(false);
}
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
{
}
catch (Exception exception)
{
logger.LogDebug(
exception,
"Event stream producer stopped for session {SessionId}.",
request.SessionId);
}
}
}
private async Task ProduceEventsAsync(
GatewaySession session,
ulong afterWorkerSequence,
ChannelWriter<MxEvent> writer,
Action eventQueued,
CancellationToken cancellationToken)
{
try
{
await foreach (WorkerEvent workerEvent in session
.ReadEventsAsync(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
MxEvent publicEvent = mapper.MapEvent(workerEvent);
if (publicEvent.WorkerSequence <= afterWorkerSequence)
{
continue;
}
if (!writer.TryWrite(publicEvent))
{
string message = $"Session {session.SessionId} event stream queue overflowed.";
session.MarkFaulted(message);
metrics.QueueOverflow("grpc-event-stream");
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
message));
return;
}
eventQueued();
}
writer.TryComplete();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
writer.TryComplete();
}
catch (Exception exception)
{
if (exception is WorkerClientException)
{
session.MarkFaulted(exception.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
}
writer.TryComplete(exception);
}
}
}
@@ -0,0 +1,10 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public interface IEventStreamService
{
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,176 @@
using Grpc.Core;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Grpc;
public sealed class MxAccessGatewayService(
ISessionManager sessionManager,
IGatewayRequestIdentityAccessor identityAccessor,
MxAccessGrpcRequestValidator requestValidator,
MxAccessGrpcMapper mapper,
IEventStreamService eventStreamService,
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
{
public override async Task<OpenSessionReply> OpenSession(
OpenSessionRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateOpenSession(request);
GatewaySession session = await sessionManager
.OpenSessionAsync(
SessionOpenRequest.FromContract(request),
ResolveClientIdentity(),
context.CancellationToken)
.ConfigureAwait(false);
OpenSessionReply reply = new()
{
SessionId = session.SessionId,
BackendName = session.BackendName,
WorkerProcessId = session.WorkerProcessId ?? 0,
WorkerProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
DefaultCommandTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(session.CommandTimeout),
ProtocolStatus = MxAccessGrpcMapper.Ok(),
};
reply.Capabilities.Add("unary-open-session");
reply.Capabilities.Add("unary-close-session");
reply.Capabilities.Add("unary-invoke");
reply.Capabilities.Add("server-stream-events");
return reply;
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
public override async Task<CloseSessionReply> CloseSession(
CloseSessionRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateCloseSession(request);
SessionCloseResult result = await sessionManager
.CloseSessionAsync(request.SessionId, context.CancellationToken)
.ConfigureAwait(false);
return new CloseSessionReply
{
SessionId = result.SessionId,
FinalState = result.FinalState,
ProtocolStatus = MxAccessGrpcMapper.Ok(result.AlreadyClosed ? "Session was already closed." : "Session closed."),
};
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
public override async Task<MxCommandReply> Invoke(
MxCommandRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateInvoke(request);
WorkerCommand workerCommand = mapper.MapCommand(request);
WorkerCommandReply workerReply = await sessionManager
.InvokeAsync(request.SessionId, workerCommand, context.CancellationToken)
.ConfigureAwait(false);
return mapper.MapCommandReply(workerReply);
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
public override async Task StreamEvents(
StreamEventsRequest request,
IServerStreamWriter<MxEvent> responseStream,
ServerCallContext context)
{
try
{
requestValidator.ValidateStreamEvents(request);
await foreach (MxEvent publicEvent in eventStreamService
.StreamEventsAsync(request, context.CancellationToken)
.WithCancellation(context.CancellationToken)
.ConfigureAwait(false))
{
await responseStream.WriteAsync(publicEvent).ConfigureAwait(false);
}
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
private string? ResolveClientIdentity()
{
return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId;
}
private RpcException MapException(Exception exception)
{
if (exception is OperationCanceledException)
{
return new RpcException(new Status(StatusCode.Cancelled, "gRPC request was canceled."));
}
if (exception is SessionManagerException sessionException)
{
return MapSessionException(sessionException);
}
if (exception is WorkerClientException workerClientException)
{
return MapWorkerClientException(workerClientException);
}
logger.LogWarning(exception, "Public gRPC request failed.");
return new RpcException(new Status(StatusCode.Unavailable, "Gateway request failed before an MXAccess reply was available."));
}
private static RpcException MapSessionException(SessionManagerException exception)
{
StatusCode statusCode = exception.ErrorCode switch
{
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
_ => StatusCode.Unavailable,
};
return new RpcException(new Status(statusCode, exception.Message));
}
private static RpcException MapWorkerClientException(WorkerClientException exception)
{
StatusCode statusCode = exception.ErrorCode switch
{
WorkerClientErrorCode.CommandTimeout => StatusCode.DeadlineExceeded,
WorkerClientErrorCode.GatewayShutdown => StatusCode.Cancelled,
WorkerClientErrorCode.InvalidState => StatusCode.FailedPrecondition,
WorkerClientErrorCode.ProtocolViolation => StatusCode.Internal,
_ => StatusCode.Unavailable,
};
return new RpcException(new Status(statusCode, exception.Message));
}
}
@@ -0,0 +1,124 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public sealed class MxAccessGrpcMapper
{
private readonly TimeProvider _timeProvider;
public MxAccessGrpcMapper(TimeProvider? timeProvider = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;
}
public WorkerCommand MapCommand(MxCommandRequest request)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Command);
return new WorkerCommand
{
Command = request.Command.Clone(),
EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
};
}
public MxCommandReply MapCommandReply(WorkerCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
if (reply.Reply is null)
{
return new MxCommandReply
{
ProtocolStatus = ProtocolViolation("Worker command reply did not contain a public reply payload."),
};
}
return reply.Reply.Clone();
}
public MxEvent MapEvent(WorkerEvent workerEvent)
{
ArgumentNullException.ThrowIfNull(workerEvent);
return workerEvent.Event?.Clone() ?? new MxEvent
{
Family = MxEventFamily.Unspecified,
RawStatus = "Worker event did not contain a public event payload.",
};
}
public static ProtocolStatus Ok(string message = "OK")
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = message,
};
}
public static ProtocolStatus InvalidRequest(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.InvalidRequest,
Message = message,
};
}
public static ProtocolStatus SessionNotFound(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotFound,
Message = message,
};
}
public static ProtocolStatus SessionNotReady(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotReady,
Message = message,
};
}
public static ProtocolStatus WorkerUnavailable(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
};
}
public static ProtocolStatus Timeout(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.Timeout,
Message = message,
};
}
public static ProtocolStatus Canceled(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.Canceled,
Message = message,
};
}
public static ProtocolStatus ProtocolViolation(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = message,
};
}
}
@@ -0,0 +1,101 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public sealed class MxAccessGrpcRequestValidator
{
public void ValidateOpenSession(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
if (request.CommandTimeout is not null && request.CommandTimeout.ToTimeSpan() <= TimeSpan.Zero)
{
throw InvalidArgument("Command timeout must be greater than zero when provided.");
}
}
public void ValidateCloseSession(CloseSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
RequireSessionId(request.SessionId);
}
public void ValidateStreamEvents(StreamEventsRequest request)
{
ArgumentNullException.ThrowIfNull(request);
RequireSessionId(request.SessionId);
}
public void ValidateInvoke(MxCommandRequest request)
{
ArgumentNullException.ThrowIfNull(request);
RequireSessionId(request.SessionId);
if (request.Command is null)
{
throw InvalidArgument("Invoke requires a command payload.");
}
if (request.Command.Kind is MxCommandKind.Unspecified)
{
throw InvalidArgument("Invoke requires a command kind.");
}
ValidateCommandPayload(request.Command);
}
private static void RequireSessionId(string sessionId)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw InvalidArgument("Session id is required.");
}
}
private static void ValidateCommandPayload(MxCommand command)
{
MxCommand.PayloadOneofCase expectedPayload = ExpectedPayload(command.Kind);
if (command.PayloadCase != expectedPayload)
{
throw InvalidArgument(
$"Command kind {command.Kind} requires payload {expectedPayload} but received {command.PayloadCase}.");
}
}
private static MxCommand.PayloadOneofCase ExpectedPayload(MxCommandKind kind)
{
return kind switch
{
MxCommandKind.Register => MxCommand.PayloadOneofCase.Register,
MxCommandKind.Unregister => MxCommand.PayloadOneofCase.Unregister,
MxCommandKind.AddItem => MxCommand.PayloadOneofCase.AddItem,
MxCommandKind.AddItem2 => MxCommand.PayloadOneofCase.AddItem2,
MxCommandKind.RemoveItem => MxCommand.PayloadOneofCase.RemoveItem,
MxCommandKind.Advise => MxCommand.PayloadOneofCase.Advise,
MxCommandKind.UnAdvise => MxCommand.PayloadOneofCase.UnAdvise,
MxCommandKind.AdviseSupervisory => MxCommand.PayloadOneofCase.AdviseSupervisory,
MxCommandKind.AddBufferedItem => MxCommand.PayloadOneofCase.AddBufferedItem,
MxCommandKind.SetBufferedUpdateInterval => MxCommand.PayloadOneofCase.SetBufferedUpdateInterval,
MxCommandKind.Suspend => MxCommand.PayloadOneofCase.Suspend,
MxCommandKind.Activate => MxCommand.PayloadOneofCase.Activate,
MxCommandKind.Write => MxCommand.PayloadOneofCase.Write,
MxCommandKind.Write2 => MxCommand.PayloadOneofCase.Write2,
MxCommandKind.WriteSecured => MxCommand.PayloadOneofCase.WriteSecured,
MxCommandKind.WriteSecured2 => MxCommand.PayloadOneofCase.WriteSecured2,
MxCommandKind.AuthenticateUser => MxCommand.PayloadOneofCase.AuthenticateUser,
MxCommandKind.ArchestraUserToId => MxCommand.PayloadOneofCase.ArchestraUserToId,
MxCommandKind.Ping => MxCommand.PayloadOneofCase.Ping,
MxCommandKind.GetSessionState => MxCommand.PayloadOneofCase.GetSessionState,
MxCommandKind.GetWorkerInfo => MxCommand.PayloadOneofCase.GetWorkerInfo,
MxCommandKind.DrainEvents => MxCommand.PayloadOneofCase.DrainEvents,
MxCommandKind.ShutdownWorker => MxCommand.PayloadOneofCase.ShutdownWorker,
_ => MxCommand.PayloadOneofCase.None,
};
}
private static RpcException InvalidArgument(string detail)
{
return new RpcException(new Status(StatusCode.InvalidArgument, detail));
}
}
@@ -4,6 +4,11 @@
<TargetFramework>net10.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.76.0" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>
+37 -1
View File
@@ -1,7 +1,43 @@
using MxGateway.Server;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
var app = GatewayApplication.Build(args);
ApiKeyAdminParseResult apiKeyAdminCommand = ApiKeyAdminCommandLineParser.Parse(args);
if (apiKeyAdminCommand.IsApiKeyCommand)
{
if (apiKeyAdminCommand.Command is null)
{
await Console.Error.WriteLineAsync(apiKeyAdminCommand.Error);
return 2;
}
WebApplicationBuilder builder = GatewayApplication.CreateBuilder([]);
ApplyApiKeyAdminOverrides(builder.Configuration, apiKeyAdminCommand.Command);
await using WebApplication cliApp = builder.Build();
await using AsyncServiceScope scope = cliApp.Services.CreateAsyncScope();
ApiKeyAdminCliRunner runner = scope.ServiceProvider.GetRequiredService<ApiKeyAdminCliRunner>();
return await runner.RunAsync(apiKeyAdminCommand.Command, Console.Out, CancellationToken.None);
}
WebApplication app = GatewayApplication.Build(args);
app.Run();
return 0;
static void ApplyApiKeyAdminOverrides(IConfiguration configuration, ApiKeyAdminCommand command)
{
if (!string.IsNullOrWhiteSpace(command.SqlitePath))
{
configuration[$"{GatewayOptions.SectionName}:Authentication:SqlitePath"] = command.SqlitePath;
}
if (!string.IsNullOrWhiteSpace(command.Pepper))
{
configuration["MxGateway:ApiKeyPepper"] = command.Pepper;
}
}
public partial class Program;
@@ -0,0 +1,180 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyAdminCliRunner(
IAuthStoreMigrator migrator,
IApiKeyAdminStore adminStore,
IApiKeyAuditStore auditStore,
IApiKeySecretHasher hasher)
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
WriteIndented = true
};
public async Task<int> RunAsync(
ApiKeyAdminCommand command,
TextWriter output,
CancellationToken cancellationToken)
{
ApiKeyAdminOutput result = command.Kind switch
{
ApiKeyAdminCommandKind.InitDb => await InitDbAsync(cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.CreateKey => await CreateKeyAsync(command, cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.ListKeys => await ListKeysAsync(cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.RevokeKey => await RevokeKeyAsync(command, cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.RotateKey => await RotateKeyAsync(command, cancellationToken).ConfigureAwait(false),
_ => throw new InvalidOperationException($"Unsupported API key command '{command.Kind}'.")
};
await WriteOutputAsync(command, result, output).ConfigureAwait(false);
return 0;
}
private async Task<ApiKeyAdminOutput> InitDbAsync(CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
await AppendAuditAsync(null, "init-db", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput("init-db", "initialized", null, []);
}
private async Task<ApiKeyAdminOutput> CreateKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
string secret = ApiKeySecretGenerator.Generate();
string apiKey = FormatApiKey(keyId, secret);
await adminStore.CreateAsync(
new ApiKeyCreateRequest(
KeyId: keyId,
KeyPrefix: $"mxgw_{keyId}",
SecretHash: hasher.HashSecret(secret),
DisplayName: Required(command.DisplayName),
Scopes: command.Scopes,
CreatedUtc: DateTimeOffset.UtcNow),
cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "create-key", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput("create-key", "created", apiKey, []);
}
private async Task<ApiKeyAdminOutput> ListKeysAsync(CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
IReadOnlyList<ApiKeyRecord> keys = await adminStore.ListAsync(cancellationToken).ConfigureAwait(false);
await AppendAuditAsync(null, "list-keys", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput(
"list-keys",
"ok",
null,
keys.Select(ToListedKey).ToArray());
}
private async Task<ApiKeyAdminOutput> RevokeKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
bool revoked = await adminStore.RevokeAsync(keyId, DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "revoke-key", revoked ? "revoked" : "not-found-or-already-revoked", cancellationToken)
.ConfigureAwait(false);
return new ApiKeyAdminOutput("revoke-key", revoked ? "revoked" : "not-found-or-already-revoked", null, []);
}
private async Task<ApiKeyAdminOutput> RotateKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
string secret = ApiKeySecretGenerator.Generate();
string apiKey = FormatApiKey(keyId, secret);
bool rotated = await adminStore.RotateAsync(keyId, hasher.HashSecret(secret), DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "rotate-key", rotated ? "rotated" : "not-found", cancellationToken)
.ConfigureAwait(false);
return new ApiKeyAdminOutput("rotate-key", rotated ? "rotated" : "not-found", rotated ? apiKey : null, []);
}
private static async Task WriteOutputAsync(
ApiKeyAdminCommand command,
ApiKeyAdminOutput result,
TextWriter output)
{
if (command.Json)
{
await output.WriteLineAsync(JsonSerializer.Serialize(result, JsonOptions)).ConfigureAwait(false);
return;
}
await output.WriteLineAsync($"{result.Command}: {result.Status}").ConfigureAwait(false);
if (result.ApiKey is not null)
{
await output.WriteLineAsync($"API key: {result.ApiKey}").ConfigureAwait(false);
}
foreach (ApiKeyAdminListedKey key in result.Keys)
{
string revoked = key.RevokedUtc is null ? "active" : "revoked";
await output.WriteLineAsync($"{key.KeyId}\t{key.DisplayName}\t{revoked}\t{string.Join(',', key.Scopes)}")
.ConfigureAwait(false);
}
}
private async Task AppendAuditAsync(
string? keyId,
string eventType,
string? details,
CancellationToken cancellationToken)
{
await auditStore.AppendAsync(
new ApiKeyAuditEntry(
KeyId: keyId,
EventType: eventType,
RemoteAddress: null,
Details: details),
cancellationToken)
.ConfigureAwait(false);
}
private static ApiKeyAdminListedKey ToListedKey(ApiKeyRecord key)
{
return new ApiKeyAdminListedKey(
KeyId: key.KeyId,
KeyPrefix: key.KeyPrefix,
DisplayName: key.DisplayName,
Scopes: key.Scopes,
CreatedUtc: key.CreatedUtc,
LastUsedUtc: key.LastUsedUtc,
RevokedUtc: key.RevokedUtc);
}
private static string FormatApiKey(string keyId, string secret)
{
return $"mxgw_{keyId}_{secret}";
}
private static string Required(string? value)
{
return value ?? throw new InvalidOperationException("Required command value was not provided.");
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminCommand(
ApiKeyAdminCommandKind Kind,
bool Json,
string? SqlitePath,
string? Pepper,
string? KeyId,
string? DisplayName,
IReadOnlySet<string> Scopes);
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public enum ApiKeyAdminCommandKind
{
InitDb,
CreateKey,
ListKeys,
RevokeKey,
RotateKey
}
@@ -0,0 +1,159 @@
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyAdminCommandLineParser
{
public static ApiKeyAdminParseResult Parse(IReadOnlyList<string> args)
{
if (args.Count == 0 || !string.Equals(args[0], "apikey", StringComparison.OrdinalIgnoreCase))
{
return ApiKeyAdminParseResult.NotApiKeyCommand();
}
if (args.Count < 2)
{
return ApiKeyAdminParseResult.Fail("Missing apikey subcommand.");
}
if (!TryParseKind(args[1], out ApiKeyAdminCommandKind kind))
{
return ApiKeyAdminParseResult.Fail($"Unknown apikey subcommand '{args[1]}'.");
}
Dictionary<string, string?> options = new(StringComparer.OrdinalIgnoreCase);
bool json = false;
for (int index = 2; index < args.Count; index++)
{
string arg = args[index];
if (string.Equals(arg, "--json", StringComparison.OrdinalIgnoreCase))
{
json = true;
continue;
}
if (!arg.StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Unexpected argument '{arg}'.");
}
string name = arg[2..];
string? value;
int equalsIndex = name.IndexOf('=', StringComparison.Ordinal);
if (equalsIndex >= 0)
{
value = name[(equalsIndex + 1)..];
name = name[..equalsIndex];
}
else
{
if (index + 1 >= args.Count || args[index + 1].StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Option '--{name}' requires a value.");
}
value = args[++index];
}
options[name] = value;
}
string? keyId = GetOption(options, "key-id");
string? displayName = GetOption(options, "display-name");
IReadOnlySet<string> scopes = ParseScopes(GetOption(options, "scopes"));
string? validationError = Validate(kind, keyId, displayName);
if (validationError is not null)
{
return ApiKeyAdminParseResult.Fail(validationError);
}
return ApiKeyAdminParseResult.Success(new ApiKeyAdminCommand(
Kind: kind,
Json: json,
SqlitePath: GetOption(options, "sqlite-path"),
Pepper: GetOption(options, "pepper"),
KeyId: keyId,
DisplayName: displayName,
Scopes: scopes));
}
private static bool TryParseKind(string value, out ApiKeyAdminCommandKind kind)
{
switch (value.ToLowerInvariant())
{
case "init-db":
kind = ApiKeyAdminCommandKind.InitDb;
return true;
case "create-key":
kind = ApiKeyAdminCommandKind.CreateKey;
return true;
case "list-keys":
kind = ApiKeyAdminCommandKind.ListKeys;
return true;
case "revoke-key":
kind = ApiKeyAdminCommandKind.RevokeKey;
return true;
case "rotate-key":
kind = ApiKeyAdminCommandKind.RotateKey;
return true;
default:
kind = default;
return false;
}
}
private static string? Validate(ApiKeyAdminCommandKind kind, string? keyId, string? displayName)
{
if (kind is ApiKeyAdminCommandKind.CreateKey or ApiKeyAdminCommandKind.RevokeKey or ApiKeyAdminCommandKind.RotateKey
&& string.IsNullOrWhiteSpace(keyId))
{
return $"Subcommand '{KindName(kind)}' requires --key-id.";
}
if (!string.IsNullOrWhiteSpace(keyId) && !IsValidKeyId(keyId))
{
return "API key id may contain only letters, numbers, periods, and hyphens.";
}
if (kind == ApiKeyAdminCommandKind.CreateKey && string.IsNullOrWhiteSpace(displayName))
{
return "Subcommand 'create-key' requires --display-name.";
}
return null;
}
private static string KindName(ApiKeyAdminCommandKind kind)
{
return kind switch
{
ApiKeyAdminCommandKind.InitDb => "init-db",
ApiKeyAdminCommandKind.CreateKey => "create-key",
ApiKeyAdminCommandKind.ListKeys => "list-keys",
ApiKeyAdminCommandKind.RevokeKey => "revoke-key",
ApiKeyAdminCommandKind.RotateKey => "rotate-key",
_ => kind.ToString()
};
}
private static bool IsValidKeyId(string keyId)
{
return keyId.All(character =>
char.IsAsciiLetterOrDigit(character)
|| character is '.' or '-');
}
private static string? GetOption(Dictionary<string, string?> options, string name)
{
return options.TryGetValue(name, out string? value) ? value : null;
}
private static IReadOnlySet<string> ParseScopes(string? scopes)
{
return new HashSet<string>(
(scopes ?? string.Empty)
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries),
StringComparer.Ordinal);
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminListedKey(
string KeyId,
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminOutput(
string Command,
string Status,
string? ApiKey,
IReadOnlyList<ApiKeyAdminListedKey> Keys);
@@ -0,0 +1,22 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminParseResult(
bool IsApiKeyCommand,
ApiKeyAdminCommand? Command,
string? Error)
{
public static ApiKeyAdminParseResult NotApiKeyCommand()
{
return new ApiKeyAdminParseResult(false, null, null);
}
public static ApiKeyAdminParseResult Success(ApiKeyAdminCommand command)
{
return new ApiKeyAdminParseResult(true, command, null);
}
public static ApiKeyAdminParseResult Fail(string error)
{
return new ApiKeyAdminParseResult(true, null, error);
}
}
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAuditEntry(
string? KeyId,
string EventType,
string? RemoteAddress,
string? Details);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAuditRecord(
long AuditId,
string? KeyId,
string EventType,
string? RemoteAddress,
DateTimeOffset CreatedUtc,
string? Details);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyCreateRequest(
string KeyId,
string KeyPrefix,
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc);
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyIdentity(
string KeyId,
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes);
@@ -0,0 +1,45 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyParser : IApiKeyParser
{
private const string BearerPrefix = "Bearer ";
private const string TokenPrefix = "mxgw_";
public bool TryParseAuthorizationHeader(string? authorizationHeader, out ParsedApiKey? apiKey)
{
apiKey = null;
if (string.IsNullOrWhiteSpace(authorizationHeader)
|| !authorizationHeader.StartsWith(BearerPrefix, StringComparison.OrdinalIgnoreCase))
{
return false;
}
string token = authorizationHeader[BearerPrefix.Length..].Trim();
if (!token.StartsWith(TokenPrefix, StringComparison.OrdinalIgnoreCase))
{
return false;
}
string keyPayload = token[TokenPrefix.Length..];
int separatorIndex = keyPayload.IndexOf('_', StringComparison.Ordinal);
if (separatorIndex <= 0 || separatorIndex == keyPayload.Length - 1)
{
return false;
}
string keyId = keyPayload[..separatorIndex];
string secret = keyPayload[(separatorIndex + 1)..];
if (string.IsNullOrWhiteSpace(keyId) || string.IsNullOrWhiteSpace(secret))
{
return false;
}
apiKey = new ParsedApiKey(keyId, secret);
return true;
}
}
@@ -0,0 +1,4 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyPepperUnavailableException(string pepperSecretName)
: InvalidOperationException($"API key pepper secret '{pepperSecretName}' is not configured.");
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyRecord(
string KeyId,
string KeyPrefix,
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,26 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyRecordReader
{
public static ApiKeyRecord Read(SqliteDataReader reader)
{
return new ApiKeyRecord(
KeyId: reader.GetString(0),
KeyPrefix: reader.GetString(1),
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
}
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
{
return reader.IsDBNull(ordinal)
? null
: DateTimeOffset.Parse(reader.GetString(ordinal), System.Globalization.CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,23 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyScopeSerializer
{
public static string Serialize(IReadOnlySet<string> scopes)
{
return JsonSerializer.Serialize(scopes.Order(StringComparer.Ordinal));
}
public static IReadOnlySet<string> Deserialize(string value)
{
if (string.IsNullOrWhiteSpace(value))
{
return new HashSet<string>(StringComparer.Ordinal);
}
string[]? scopes = JsonSerializer.Deserialize<string[]>(value);
return new HashSet<string>(scopes ?? [], StringComparer.Ordinal);
}
}
@@ -0,0 +1,17 @@
using System.Security.Cryptography;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeySecretGenerator
{
public static string Generate()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes)
.TrimEnd('=')
.Replace('+', '-')
.Replace('/', '_');
}
}
@@ -0,0 +1,35 @@
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeySecretHasher(
IConfiguration configuration,
IOptions<GatewayOptions> options) : IApiKeySecretHasher
{
public byte[] HashSecret(string secret)
{
string pepper = GetPepper();
byte[] pepperBytes = Encoding.UTF8.GetBytes(pepper);
byte[] secretBytes = Encoding.UTF8.GetBytes(secret);
using HMACSHA256 hmac = new(pepperBytes);
return hmac.ComputeHash(secretBytes);
}
private string GetPepper()
{
string pepperSecretName = options.Value.Authentication.PepperSecretName;
string? pepper = configuration[pepperSecretName];
if (string.IsNullOrWhiteSpace(pepper))
{
throw new ApiKeyPepperUnavailableException(pepperSecretName);
}
return pepper;
}
}
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Security.Authentication;
public enum ApiKeyVerificationFailure
{
None,
MissingOrMalformedCredentials,
PepperUnavailable,
KeyNotFound,
KeyRevoked,
SecretMismatch
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyVerificationResult(
bool Succeeded,
ApiKeyIdentity? Identity,
ApiKeyVerificationFailure Failure)
{
public static ApiKeyVerificationResult Success(ApiKeyIdentity identity)
{
return new ApiKeyVerificationResult(
Succeeded: true,
Identity: identity,
Failure: ApiKeyVerificationFailure.None);
}
public static ApiKeyVerificationResult Fail(ApiKeyVerificationFailure failure)
{
return new ApiKeyVerificationResult(
Succeeded: false,
Identity: null,
Failure: failure);
}
}
@@ -0,0 +1,57 @@
using System.Security.Cryptography;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyVerifier(
IApiKeyParser parser,
IApiKeySecretHasher hasher,
IApiKeyStore keyStore) : IApiKeyVerifier
{
public async Task<ApiKeyVerificationResult> VerifyAsync(
string? authorizationHeader,
CancellationToken cancellationToken)
{
if (!parser.TryParseAuthorizationHeader(authorizationHeader, out ParsedApiKey? parsedKey)
|| parsedKey is null)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.MissingOrMalformedCredentials);
}
ApiKeyRecord? storedKey = await keyStore.FindByKeyIdAsync(parsedKey.KeyId, cancellationToken)
.ConfigureAwait(false);
if (storedKey is null)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.KeyNotFound);
}
if (storedKey.RevokedUtc is not null)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.KeyRevoked);
}
byte[] presentedHash;
try
{
presentedHash = hasher.HashSecret(parsedKey.Secret);
}
catch (ApiKeyPepperUnavailableException)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.PepperUnavailable);
}
if (!CryptographicOperations.FixedTimeEquals(presentedHash, storedKey.SecretHash))
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.SecretMismatch);
}
await keyStore.MarkKeyUsedAsync(storedKey.KeyId, DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
return ApiKeyVerificationResult.Success(new ApiKeyIdentity(
KeyId: storedKey.KeyId,
KeyPrefix: storedKey.KeyPrefix,
DisplayName: storedKey.DisplayName,
Scopes: storedKey.Scopes));
}
}
@@ -0,0 +1,27 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthSqliteConnectionFactory(IOptions<GatewayOptions> options)
{
public SqliteConnection CreateConnection()
{
string sqlitePath = options.Value.Authentication.SqlitePath;
string? directory = Path.GetDirectoryName(sqlitePath);
if (!string.IsNullOrWhiteSpace(directory))
{
Directory.CreateDirectory(directory);
}
SqliteConnectionStringBuilder builder = new()
{
DataSource = sqlitePath,
Mode = SqliteOpenMode.ReadWriteCreate
};
return new SqliteConnection(builder.ToString());
}
}
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthStoreMigrationException(string message) : InvalidOperationException(message);
@@ -0,0 +1,24 @@
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthStoreMigrationHostedService(
IOptions<GatewayOptions> options,
IAuthStoreMigrator migrator) : IHostedService
{
public async Task StartAsync(CancellationToken cancellationToken)
{
AuthenticationOptions authentication = options.Value.Authentication;
if (authentication.Mode == AuthenticationMode.ApiKey && authentication.RunMigrationsOnStartup)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
@@ -0,0 +1,20 @@
namespace MxGateway.Server.Security.Authentication;
public static class AuthStoreServiceCollectionExtensions
{
public static IServiceCollection AddSqliteAuthStore(this IServiceCollection services)
{
services.AddSingleton<IApiKeyParser, ApiKeyParser>();
services.AddSingleton<IApiKeySecretHasher, ApiKeySecretHasher>();
services.AddSingleton<IApiKeyVerifier, ApiKeyVerifier>();
services.AddSingleton<ApiKeyAdminCliRunner>();
services.AddSingleton<AuthSqliteConnectionFactory>();
services.AddSingleton<IAuthStoreMigrator, SqliteAuthStoreMigrator>();
services.AddSingleton<IApiKeyStore, SqliteApiKeyStore>();
services.AddSingleton<IApiKeyAdminStore, SqliteApiKeyAdminStore>();
services.AddSingleton<IApiKeyAuditStore, SqliteApiKeyAuditStore>();
services.AddHostedService<AuthStoreMigrationHostedService>();
return services;
}
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyAdminStore
{
Task CreateAsync(ApiKeyCreateRequest request, CancellationToken cancellationToken);
Task<IReadOnlyList<ApiKeyRecord>> ListAsync(CancellationToken cancellationToken);
Task<bool> RevokeAsync(string keyId, DateTimeOffset revokedUtc, CancellationToken cancellationToken);
Task<bool> RotateAsync(
string keyId,
byte[] secretHash,
DateTimeOffset rotatedUtc,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyAuditStore
{
Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken);
Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyParser
{
bool TryParseAuthorizationHeader(string? authorizationHeader, out ParsedApiKey? apiKey);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeySecretHasher
{
byte[] HashSecret(string secret);
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyStore
{
Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken);
Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken);
Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyVerifier
{
Task<ApiKeyVerificationResult> VerifyAsync(
string? authorizationHeader,
CancellationToken cancellationToken);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IAuthStoreMigrator
{
Task MigrateAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ParsedApiKey(string KeyId, string Secret);
@@ -0,0 +1,116 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAdminStore
{
public async Task CreateAsync(ApiKeyCreateRequest request, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_keys (
key_id,
key_prefix,
secret_hash,
display_name,
scopes,
created_utc,
last_used_utc,
revoked_utc)
VALUES (
$key_id,
$key_prefix,
$secret_hash,
$display_name,
$scopes,
$created_utc,
NULL,
NULL);
""";
AddCreateParameters(command, request);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ApiKeyRecord>> ListAsync(CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
ORDER BY key_id;
""";
List<ApiKeyRecord> records = [];
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(ApiKeyRecordReader.Read(reader));
}
return records;
}
public async Task<bool> RevokeAsync(string keyId, DateTimeOffset revokedUtc, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET revoked_utc = $revoked_utc
WHERE key_id = $key_id AND revoked_utc IS NULL;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.AddWithValue("$revoked_utc", revokedUtc.ToString("O"));
int rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
public async Task<bool> RotateAsync(
string keyId,
byte[] secretHash,
DateTimeOffset rotatedUtc,
CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET secret_hash = $secret_hash,
last_used_utc = NULL,
revoked_utc = NULL
WHERE key_id = $key_id;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = secretHash;
int rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
private static void AddCreateParameters(SqliteCommand command, ApiKeyCreateRequest request)
{
command.Parameters.AddWithValue("$key_id", request.KeyId);
command.Parameters.AddWithValue("$key_prefix", request.KeyPrefix);
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = request.SecretHash;
command.Parameters.AddWithValue("$display_name", request.DisplayName);
command.Parameters.AddWithValue("$scopes", ApiKeyScopeSerializer.Serialize(request.Scopes));
command.Parameters.AddWithValue("$created_utc", request.CreatedUtc.ToString("O"));
}
}
@@ -0,0 +1,65 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyAuditStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAuditStore
{
public async Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_key_audit (key_id, event_type, remote_address, created_utc, details)
VALUES ($key_id, $event_type, $remote_address, $created_utc, $details);
""";
command.Parameters.AddWithValue("$key_id", (object?)entry.KeyId ?? DBNull.Value);
command.Parameters.AddWithValue("$event_type", entry.EventType);
command.Parameters.AddWithValue("$remote_address", (object?)entry.RemoteAddress ?? DBNull.Value);
command.Parameters.AddWithValue("$created_utc", DateTimeOffset.UtcNow.ToString("O"));
command.Parameters.AddWithValue("$details", (object?)entry.Details ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken)
{
if (count <= 0)
{
return [];
}
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT audit_id, key_id, event_type, remote_address, created_utc, details
FROM api_key_audit
ORDER BY audit_id DESC
LIMIT $count;
""";
command.Parameters.AddWithValue("$count", count);
List<ApiKeyAuditRecord> records = [];
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(new ApiKeyAuditRecord(
AuditId: reader.GetInt64(0),
KeyId: reader.IsDBNull(1) ? null : reader.GetString(1),
EventType: reader.GetString(2),
RemoteAddress: reader.IsDBNull(3) ? null : reader.GetString(3),
CreatedUtc: DateTimeOffset.Parse(
reader.GetString(4),
System.Globalization.CultureInfo.InvariantCulture),
Details: reader.IsDBNull(5) ? null : reader.GetString(5)));
}
return records;
}
}
@@ -0,0 +1,66 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyStore
{
public Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken)
{
return FindByKeyIdAsync(keyId, requireActive: false, cancellationToken);
}
public Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken)
{
return FindByKeyIdAsync(keyId, requireActive: true, cancellationToken);
}
public async Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET last_used_utc = $last_used_utc
WHERE key_id = $key_id AND revoked_utc IS NULL;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.AddWithValue("$last_used_utc", usedUtc.ToString("O"));
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private async Task<ApiKeyRecord?> FindByKeyIdAsync(
string keyId,
bool requireActive,
CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = requireActive
? """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id AND revoked_utc IS NULL;
"""
: """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id;
""";
command.Parameters.AddWithValue("$key_id", keyId);
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
return ApiKeyRecordReader.Read(reader);
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Security.Authentication;
public static class SqliteAuthSchema
{
public const int CurrentVersion = 1;
public const string SchemaVersionTable = "schema_version";
public const string ApiKeysTable = "api_keys";
public const string ApiKeyAuditTable = "api_key_audit";
}
@@ -0,0 +1,135 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connectionFactory) : IAuthStoreMigrator
{
public async Task MigrateAsync(CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteTransaction transaction =
(SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
int existingVersion = await ReadExistingSchemaVersionAsync(connection, transaction, cancellationToken)
.ConfigureAwait(false);
if (existingVersion > SqliteAuthSchema.CurrentVersion)
{
throw new AuthStoreMigrationException(
$"Auth database schema version {existingVersion} is newer than supported version {SqliteAuthSchema.CurrentVersion}.");
}
await ApplyVersionOneAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task<int> ReadExistingSchemaVersionAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await using SqliteCommand tableExistsCommand = connection.CreateCommand();
tableExistsCommand.Transaction = transaction;
tableExistsCommand.CommandText = """
SELECT COUNT(*)
FROM sqlite_master
WHERE type = 'table' AND name = $table_name;
""";
tableExistsCommand.Parameters.AddWithValue("$table_name", SqliteAuthSchema.SchemaVersionTable);
long tableCount = (long)(await tableExistsCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) ?? 0L);
if (tableCount == 0)
{
return 0;
}
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
SELECT version
FROM schema_version
WHERE id = 1;
""";
object? version = await versionCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return version is null || version == DBNull.Value
? 0
: Convert.ToInt32(version, System.Globalization.CultureInfo.InvariantCulture);
}
private static async Task ApplyVersionOneAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await ExecuteNonQueryAsync(
connection,
transaction,
"""
CREATE TABLE IF NOT EXISTS schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
applied_utc TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS api_keys (
key_id TEXT PRIMARY KEY,
key_prefix TEXT NOT NULL,
secret_hash BLOB NOT NULL,
display_name TEXT NOT NULL,
scopes TEXT NOT NULL,
created_utc TEXT NOT NULL,
last_used_utc TEXT NULL,
revoked_utc TEXT NULL
);
CREATE TABLE IF NOT EXISTS api_key_audit (
audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT NULL,
event_type TEXT NOT NULL,
remote_address TEXT NULL,
created_utc TEXT NOT NULL,
details TEXT NULL
);
CREATE INDEX IF NOT EXISTS ix_api_keys_revoked_utc
ON api_keys (revoked_utc);
CREATE INDEX IF NOT EXISTS ix_api_key_audit_key_id_created_utc
ON api_key_audit (key_id, created_utc);
""",
cancellationToken).ConfigureAwait(false);
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
INSERT INTO schema_version (id, version, applied_utc)
VALUES (1, $version, $applied_utc)
ON CONFLICT(id) DO UPDATE SET
version = excluded.version,
applied_utc = excluded.applied_utc;
""";
versionCommand.Parameters.AddWithValue("$version", SqliteAuthSchema.CurrentVersion);
versionCommand.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
await versionCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task ExecuteNonQueryAsync(
SqliteConnection connection,
SqliteTransaction transaction,
string commandText,
CancellationToken cancellationToken)
{
await using SqliteCommand command = connection.CreateCommand();
command.Transaction = transaction;
command.CommandText = commandText;
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}
@@ -0,0 +1,74 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayGrpcAuthorizationInterceptor(
IApiKeyVerifier apiKeyVerifier,
GatewayGrpcScopeResolver scopeResolver,
IGatewayRequestIdentityAccessor identityAccessor,
IOptions<GatewayOptions> options) : Interceptor
{
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
ApiKeyIdentity? identity = await AuthenticateAndAuthorizeAsync(request, context).ConfigureAwait(false);
IDisposable? identityScope = identity is null ? null : identityAccessor.Push(identity);
using (identityScope)
{
return await continuation(request, context).ConfigureAwait(false);
}
}
public override async Task ServerStreamingServerHandler<TRequest, TResponse>(
TRequest request,
IServerStreamWriter<TResponse> responseStream,
ServerCallContext context,
ServerStreamingServerMethod<TRequest, TResponse> continuation)
{
ApiKeyIdentity? identity = await AuthenticateAndAuthorizeAsync(request, context).ConfigureAwait(false);
IDisposable? identityScope = identity is null ? null : identityAccessor.Push(identity);
using (identityScope)
{
await continuation(request, responseStream, context).ConfigureAwait(false);
}
}
private async Task<ApiKeyIdentity?> AuthenticateAndAuthorizeAsync<TRequest>(
TRequest request,
ServerCallContext context)
where TRequest : class
{
if (options.Value.Authentication.Mode == AuthenticationMode.Disabled)
{
return null;
}
string? authorizationHeader = context.RequestHeaders.GetValue("authorization");
ApiKeyVerificationResult verificationResult = await apiKeyVerifier
.VerifyAsync(authorizationHeader, context.CancellationToken)
.ConfigureAwait(false);
if (!verificationResult.Succeeded || verificationResult.Identity is null)
{
throw new RpcException(new Status(
StatusCode.Unauthenticated,
"Missing or invalid API key."));
}
string requiredScope = scopeResolver.ResolveRequiredScope(request);
if (!verificationResult.Identity.Scopes.Contains(requiredScope))
{
throw new RpcException(new Status(
StatusCode.PermissionDenied,
$"API key is missing required scope '{requiredScope}'."));
}
return verificationResult.Identity;
}
}
@@ -0,0 +1,40 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayGrpcScopeResolver
{
public string ResolveRequiredScope(object request)
{
return request switch
{
OpenSessionRequest => GatewayScopes.SessionOpen,
CloseSessionRequest => GatewayScopes.SessionClose,
StreamEventsRequest => GatewayScopes.EventsRead,
MxCommandRequest commandRequest => ResolveCommandScope(commandRequest.Command?.Kind ?? MxCommandKind.Unspecified),
_ => GatewayScopes.Admin
};
}
private static string ResolveCommandScope(MxCommandKind kind)
{
return kind switch
{
MxCommandKind.Write or
MxCommandKind.Write2 => GatewayScopes.InvokeWrite,
MxCommandKind.WriteSecured or
MxCommandKind.WriteSecured2 or
MxCommandKind.AuthenticateUser => GatewayScopes.InvokeSecure,
MxCommandKind.ArchestraUserToId or
MxCommandKind.GetSessionState or
MxCommandKind.GetWorkerInfo => GatewayScopes.MetadataRead,
MxCommandKind.DrainEvents => GatewayScopes.EventsRead,
MxCommandKind.ShutdownWorker => GatewayScopes.Admin,
_ => GatewayScopes.InvokeRead
};
}
}
@@ -0,0 +1,38 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayRequestIdentityAccessor : IGatewayRequestIdentityAccessor
{
private readonly AsyncLocal<ApiKeyIdentity?> currentIdentity = new();
public ApiKeyIdentity? Current => currentIdentity.Value;
public IDisposable Push(ApiKeyIdentity identity)
{
ArgumentNullException.ThrowIfNull(identity);
ApiKeyIdentity? previousIdentity = currentIdentity.Value;
currentIdentity.Value = identity;
return new IdentityScope(this, previousIdentity);
}
private sealed class IdentityScope(
GatewayRequestIdentityAccessor accessor,
ApiKeyIdentity? previousIdentity) : IDisposable
{
private bool disposed;
public void Dispose()
{
if (disposed)
{
return;
}
accessor.currentIdentity.Value = previousIdentity;
disposed = true;
}
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Security.Authorization;
public static class GatewayScopes
{
public const string SessionOpen = "session:open";
public const string SessionClose = "session:close";
public const string InvokeRead = "invoke:read";
public const string InvokeWrite = "invoke:write";
public const string InvokeSecure = "invoke:secure";
public const string EventsRead = "events:read";
public const string MetadataRead = "metadata:read";
public const string Admin = "admin";
}
@@ -0,0 +1,16 @@
using Grpc.Core.Interceptors;
namespace MxGateway.Server.Security.Authorization;
public static class GrpcAuthorizationServiceCollectionExtensions
{
public static IServiceCollection AddGatewayGrpcAuthorization(this IServiceCollection services)
{
services.AddSingleton<GatewayGrpcScopeResolver>();
services.AddSingleton<IGatewayRequestIdentityAccessor, GatewayRequestIdentityAccessor>();
services.AddSingleton<GatewayGrpcAuthorizationInterceptor>();
services.AddGrpc(options => options.Interceptors.Add<GatewayGrpcAuthorizationInterceptor>());
return services;
}
}
@@ -0,0 +1,10 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public interface IGatewayRequestIdentityAccessor
{
ApiKeyIdentity? Current { get; }
IDisposable Push(ApiKeyIdentity identity);
}
@@ -0,0 +1,352 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(backendName))
{
throw new ArgumentException("Backend name is required.", nameof(backendName));
}
if (string.IsNullOrWhiteSpace(pipeName))
{
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Nonce is required.", nameof(nonce));
}
SessionId = sessionId;
BackendName = backendName;
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
}
public string SessionId { get; }
public string BackendName { get; }
public string PipeName { get; }
public string Nonce { get; }
public string? ClientIdentity { get; }
public string? ClientSessionName { get; }
public string? ClientCorrelationId { get; }
public TimeSpan CommandTimeout { get; }
public TimeSpan StartupTimeout { get; }
public TimeSpan ShutdownTimeout { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
public IWorkerClient? WorkerClient => _workerClient;
public SessionState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastClientActivityAt
{
get
{
lock (_syncRoot)
{
return _lastClientActivityAt;
}
}
}
public DateTimeOffset? LeaseExpiresAt
{
get
{
lock (_syncRoot)
{
return _leaseExpiresAt;
}
}
}
public string? FinalFault
{
get
{
lock (_syncRoot)
{
return _finalFault;
}
}
}
public int ActiveEventSubscriberCount
{
get
{
lock (_syncRoot)
{
return _activeEventSubscriberCount;
}
}
}
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
lock (_syncRoot)
{
_workerClient = workerClient;
}
}
public void TransitionTo(SessionState nextState)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
{
return;
}
_state = nextState;
}
}
public void MarkReady()
{
TransitionTo(SessionState.Ready);
}
public void MarkFaulted(string reason)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
_finalFault = reason;
_state = SessionState.Faulted;
}
}
public void TouchClientActivity(DateTimeOffset activityAt)
{
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
}
}
public void ExtendLease(DateTimeOffset leaseExpiresAt)
{
lock (_syncRoot)
{
_leaseExpiresAt = leaseExpiresAt;
}
}
public bool IsLeaseExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
}
}
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
{
throw new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
return new EventSubscriberLease(this);
}
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return workerClient.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_state is SessionState.Closed)
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch
{
_workerClient.Kill(reason);
throw;
}
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
finally
{
_closeLock.Release();
}
}
public void KillWorker(string reason)
{
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
}
public async ValueTask DisposeAsync()
{
_closeLock.Dispose();
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
}
}
private IWorkerClient GetReadyWorkerClient()
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Current state is {_state}.");
}
return _workerClient;
}
}
private void DetachEventSubscriber()
{
lock (_syncRoot)
{
if (_activeEventSubscriberCount > 0)
{
_activeEventSubscriberCount--;
}
}
}
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
{
private bool _disposed;
public void Dispose()
{
if (_disposed)
{
return;
}
session.DetachEventSubscriber();
_disposed = true;
}
}
}
@@ -0,0 +1,34 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public interface ISessionManager
{
Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken);
bool TryGetSession(
string sessionId,
out GatewaySession session);
Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken);
Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken);
Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Sessions;
public interface ISessionRegistry
{
int Count { get; }
int ActiveCount { get; }
bool TryAdd(GatewaySession session);
bool TryGet(string sessionId, out GatewaySession session);
bool TryRemove(string sessionId, out GatewaySession session);
IReadOnlyCollection<GatewaySession> Snapshot();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Sessions;
public interface ISessionWorkerClientFactory
{
Task<MxGateway.Server.Workers.IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionCloseResult(
string SessionId,
SessionState FinalState,
bool AlreadyClosed);
@@ -0,0 +1,287 @@
using System.Security.Cryptography;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionManager : ISessionManager
{
public const string DefaultCloseReason = "client-close";
public const string GatewayShutdownReason = "gateway-shutdown";
public const string LeaseExpiredReason = "lease-expired";
private readonly ISessionRegistry _registry;
private readonly ISessionWorkerClientFactory _workerClientFactory;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SessionManager> _logger;
private readonly GatewayOptions _options;
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
TimeProvider? timeProvider = null,
ILogger<SessionManager>? logger = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<SessionManager>.Instance;
_options = options.Value;
}
public async Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
EnsureSessionCapacity();
GatewaySession session = CreateSession(request, clientIdentity);
if (!_registry.TryAdd(session))
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Session id collision while opening session {session.SessionId}.");
}
try
{
session.TransitionTo(SessionState.StartingWorker);
IWorkerClient workerClient = await _workerClientFactory
.CreateAsync(session, cancellationToken)
.ConfigureAwait(false);
session.AttachWorkerClient(workerClient);
session.MarkReady();
_metrics.SessionOpened();
return session;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_registry.TryRemove(session.SessionId, out _);
await session.DisposeAsync().ConfigureAwait(false);
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
_logger.LogWarning(
exception,
"Failed to open gateway session {SessionId}.",
session.SessionId);
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Failed to open session {session.SessionId}.",
exception);
}
}
public bool TryGetSession(
string sessionId,
out GatewaySession session)
{
return _registry.TryGet(sessionId, out session);
}
public async Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
try
{
return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception exception)
{
if (session.WorkerClient?.State == WorkerClientState.Faulted)
{
session.MarkFaulted(exception.Message);
}
throw;
}
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
return session.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
SessionCloseResult result = await CloseSessionCoreAsync(
session,
DefaultCloseReason,
cancellationToken).ConfigureAwait(false);
return result;
}
public async Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
int closedCount = 0;
foreach (GatewaySession session in _registry.Snapshot())
{
if (!session.IsLeaseExpired(now))
{
continue;
}
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
closedCount++;
}
return closedCount;
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (GatewaySession session in _registry.Snapshot())
{
try
{
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Graceful shutdown failed for session {SessionId}; killing worker.",
session.SessionId);
session.KillWorker(GatewayShutdownReason);
}
}
}
private async Task<SessionCloseResult> CloseSessionCoreAsync(
GatewaySession session,
string reason,
CancellationToken cancellationToken)
{
bool wasClosed = session.State == SessionState.Closed;
try
{
SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false);
if (!wasClosed && !result.AlreadyClosed)
{
_metrics.SessionClosed();
}
return result;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString());
throw new SessionManagerException(
SessionManagerErrorCode.CloseFailed,
$"Failed to close session {session.SessionId}.",
exception);
}
}
private GatewaySession GetRequiredSession(string sessionId)
{
if (!_registry.TryGet(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private void EnsureSessionCapacity()
{
if (_registry.ActiveCount >= _options.Sessions.MaxSessions)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionLimitExceeded,
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
}
}
private GatewaySession CreateSession(
SessionOpenRequest request,
string? clientIdentity)
{
string sessionId = CreateSessionId();
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
? GatewayContractInfo.DefaultBackendName
: request.RequestedBackend!;
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
return new GatewaySession(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
request.ClientSessionName,
request.ClientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
openedAt);
}
private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout)
{
if (requestedTimeout is null)
{
return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds);
}
TimeSpan timeout = requestedTimeout.ToTimeSpan();
return timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds)
: timeout;
}
private static string CreateSessionId()
{
return $"session-{Guid.NewGuid():N}";
}
private static string CreateNonce()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes);
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Sessions;
public enum SessionManagerErrorCode
{
SessionNotFound,
SessionNotReady,
EventSubscriberAlreadyActive,
EventQueueOverflow,
SessionLimitExceeded,
OpenFailed,
CloseFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Sessions;
public sealed class SessionManagerException : Exception
{
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public SessionManagerErrorCode ErrorCode { get; }
}
@@ -0,0 +1,22 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionOpenRequest(
string? RequestedBackend,
string? ClientSessionName,
string? ClientCorrelationId,
Duration? CommandTimeout)
{
public static SessionOpenRequest FromContract(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
return new SessionOpenRequest(
request.RequestedBackend,
request.ClientSessionName,
request.ClientCorrelationId,
request.CommandTimeout);
}
}
@@ -0,0 +1,39 @@
using System.Collections.Concurrent;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed class SessionRegistry : ISessionRegistry
{
private readonly ConcurrentDictionary<string, GatewaySession> _sessions = new(StringComparer.Ordinal);
public int Count => _sessions.Count;
public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed);
public bool TryAdd(GatewaySession session)
{
ArgumentNullException.ThrowIfNull(session);
return _sessions.TryAdd(session.SessionId, session);
}
public bool TryGet(
string sessionId,
out GatewaySession session)
{
return _sessions.TryGetValue(sessionId, out session!);
}
public bool TryRemove(
string sessionId,
out GatewaySession session)
{
return _sessions.TryRemove(sessionId, out session!);
}
public IReadOnlyCollection<GatewaySession> Snapshot()
{
return _sessions.Values.ToArray();
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Sessions;
public static class SessionServiceCollectionExtensions
{
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
return services;
}
}
@@ -0,0 +1,144 @@
using System.IO.Pipes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly IWorkerProcessLauncher _workerProcessLauncher;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly GatewayOptions _options;
public SessionWorkerClientFactory(
IWorkerProcessLauncher workerProcessLauncher,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
ILoggerFactory loggerFactory,
TimeProvider? timeProvider = null)
{
_workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_timeProvider = timeProvider ?? TimeProvider.System;
_options = options.Value;
}
public async Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(session);
NamedPipeServerStream? pipe = CreatePipe(session.PipeName);
WorkerProcessHandle? processHandle = null;
IWorkerClient? workerClient = null;
try
{
session.TransitionTo(SessionState.StartingWorker);
processHandle = await _workerProcessLauncher
.LaunchAsync(
new WorkerProcessLaunchRequest(
session.SessionId,
session.PipeName,
GatewayContractInfo.WorkerProtocolVersion,
session.Nonce,
pipe),
cancellationToken)
.ConfigureAwait(false);
session.TransitionTo(SessionState.WaitingForPipe);
await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false);
session.TransitionTo(SessionState.Handshaking);
WorkerFrameProtocolOptions frameOptions = new(
session.SessionId,
GatewayContractInfo.WorkerProtocolVersion,
_options.Worker.MaxMessageBytes);
WorkerClientConnection connection = new(
session.SessionId,
session.Nonce,
pipe,
frameOptions,
processHandle);
WorkerClientOptions clientOptions = new()
{
HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds),
HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds),
EventChannelCapacity = _options.Events.QueueCapacity,
};
workerClient = new WorkerClient(
connection,
clientOptions,
_metrics,
_timeProvider,
_loggerFactory.CreateLogger<WorkerClient>());
pipe = null;
processHandle = null;
session.TransitionTo(SessionState.InitializingWorker);
await workerClient.StartAsync(cancellationToken).ConfigureAwait(false);
return workerClient;
}
catch
{
if (workerClient is not null)
{
await workerClient.DisposeAsync().ConfigureAwait(false);
}
else
{
if (processHandle is not null)
{
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics.WorkerKilled("OpenSessionFailed");
}
}
finally
{
processHandle.Dispose();
}
}
pipe?.Dispose();
}
throw;
}
}
private static NamedPipeServerStream CreatePipe(string pipeName)
{
return new NamedPipeServerStream(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
}
private static async Task WaitForPipeConnectionAsync(
NamedPipeServerStream pipe,
TimeSpan startupTimeout,
CancellationToken cancellationToken)
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(startupTimeout);
await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false);
}
}
@@ -0,0 +1,27 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public interface IWorkerClient : IAsyncDisposable
{
string SessionId { get; }
int? ProcessId { get; }
WorkerClientState State { get; }
DateTimeOffset LastHeartbeatAt { get; }
Task StartAsync(CancellationToken cancellationToken);
Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken);
Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken);
void Kill(string reason);
}
@@ -0,0 +1,14 @@
namespace MxGateway.Server.Workers;
public interface IWorkerProcess : IDisposable
{
int Id { get; }
bool HasExited { get; }
int? ExitCode { get; }
ValueTask WaitForExitAsync(CancellationToken cancellationToken);
void Kill(bool entireProcessTree);
}
@@ -0,0 +1,8 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
public interface IWorkerProcessFactory
{
IWorkerProcess Start(ProcessStartInfo startInfo);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Workers;
public interface IWorkerProcessLauncher
{
Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default);
}
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Workers;
public interface IWorkerStartupProbe
{
Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,27 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
internal sealed class SystemWorkerProcess(Process process) : IWorkerProcess
{
public int Id => process.Id;
public bool HasExited => process.HasExited;
public int? ExitCode => process.HasExited ? process.ExitCode : null;
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
public void Kill(bool entireProcessTree)
{
process.Kill(entireProcessTree);
}
public void Dispose()
{
process.Dispose();
}
}
@@ -0,0 +1,22 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
public sealed class SystemWorkerProcessFactory : IWorkerProcessFactory
{
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
Process process = new()
{
StartInfo = startInfo,
};
if (!process.Start())
{
process.Dispose();
throw new InvalidOperationException("Worker process failed to start.");
}
return new SystemWorkerProcess(process);
}
}
@@ -0,0 +1,757 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Metrics;
namespace MxGateway.Server.Workers;
public sealed class WorkerClient : IWorkerClient
{
private const string GatewayVersionFallback = "unknown";
private readonly object _syncRoot = new();
private readonly WorkerClientConnection _connection;
private readonly WorkerClientOptions _options;
private readonly GatewayMetrics? _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<WorkerClient> _logger;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private readonly Channel<WorkerEnvelope> _outboundEnvelopes;
private readonly Channel<WorkerEvent> _events;
private readonly ConcurrentDictionary<string, PendingCommand> _pendingCommands = new(StringComparer.Ordinal);
private readonly CancellationTokenSource _stopCts = new();
private long _nextSequence;
private WorkerClientState _state;
private DateTimeOffset _lastHeartbeatAt;
private int? _processId;
private int _eventQueueDepth;
private Task? _readLoopTask;
private Task? _writeLoopTask;
private Task? _heartbeatLoopTask;
private bool _disposed;
public WorkerClient(
WorkerClientConnection connection,
WorkerClientOptions? options = null,
GatewayMetrics? metrics = null,
TimeProvider? timeProvider = null,
ILogger<WorkerClient>? logger = null)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_options = options ?? new WorkerClientOptions();
_metrics = metrics;
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<WorkerClient>.Instance;
_reader = new WorkerFrameReader(connection.Stream, connection.FrameOptions);
_writer = new WorkerFrameWriter(connection.Stream, connection.FrameOptions);
_outboundEnvelopes = Channel.CreateUnbounded<WorkerEnvelope>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false,
});
_events = Channel.CreateBounded<WorkerEvent>(
new BoundedChannelOptions(_options.EventChannelCapacity)
{
SingleReader = false,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
_lastHeartbeatAt = _timeProvider.GetUtcNow();
}
public string SessionId => _connection.SessionId;
public int? ProcessId
{
get
{
lock (_syncRoot)
{
return _processId;
}
}
}
public WorkerClientState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastHeartbeatAt
{
get
{
lock (_syncRoot)
{
return _lastHeartbeatAt;
}
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
TransitionFromCreatedToHandshaking();
_writeLoopTask = Task.Run(WriteLoopAsync);
await EnqueueAsync(CreateGatewayHelloEnvelope(), cancellationToken).ConfigureAwait(false);
WorkerEnvelope helloEnvelope = await ReadHandshakeEnvelopeAsync(
WorkerEnvelope.BodyOneofCase.WorkerHello,
cancellationToken).ConfigureAwait(false);
ValidateWorkerHello(helloEnvelope.WorkerHello);
WorkerEnvelope readyEnvelope = await ReadHandshakeEnvelopeAsync(
WorkerEnvelope.BodyOneofCase.WorkerReady,
cancellationToken).ConfigureAwait(false);
MarkReady(readyEnvelope.WorkerReady);
_readLoopTask = Task.Run(ReadLoopAsync);
_heartbeatLoopTask = Task.Run(HeartbeatLoopAsync);
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(command);
ThrowIfDisposed();
EnsureReady();
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Command timeout must be greater than zero.");
}
string correlationId = Guid.NewGuid().ToString("N");
string method = GetCommandMethod(command);
PendingCommand pendingCommand = new(
correlationId,
method,
_timeProvider.GetTimestamp());
if (!_pendingCommands.TryAdd(correlationId, pendingCommand))
{
throw new InvalidOperationException("Generated a duplicate command correlation id.");
}
_metrics?.CommandStarted(method);
try
{
await EnqueueAsync(CreateCommandEnvelope(correlationId, command), cancellationToken).ConfigureAwait(false);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task timeoutTask = Task.Delay(timeout, timeoutCts.Token);
Task<WorkerCommandReply> replyTask = pendingCommand.Task;
Task completedTask = await Task.WhenAny(replyTask, timeoutTask).ConfigureAwait(false);
if (completedTask == replyTask)
{
await timeoutCts.CancelAsync().ConfigureAwait(false);
return await replyTask.ConfigureAwait(false);
}
if (cancellationToken.IsCancellationRequested)
{
RemovePendingCommandAsFailed(
correlationId,
pendingCommand,
WorkerClientErrorCode.GatewayShutdown,
"Command wait was canceled.");
cancellationToken.ThrowIfCancellationRequested();
}
RemovePendingCommandAsFailed(
correlationId,
pendingCommand,
WorkerClientErrorCode.CommandTimeout,
$"Worker command {method} timed out after {timeout}.");
throw new WorkerClientException(
WorkerClientErrorCode.CommandTimeout,
$"Worker command {method} timed out after {timeout}.");
}
catch
{
_pendingCommands.TryRemove(correlationId, out _);
throw;
}
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth));
_metrics?.SetEventQueueDepth(queueDepth);
yield return workerEvent;
}
}
public async Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Shutdown timeout must be greater than zero.");
}
WorkerClientState state = State;
if (state is WorkerClientState.Closed or WorkerClientState.Faulted)
{
return;
}
MarkClosing();
await EnqueueAsync(CreateShutdownEnvelope(timeout, "gateway-shutdown"), cancellationToken).ConfigureAwait(false);
_outboundEnvelopes.Writer.TryComplete();
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
try
{
await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false);
MarkClosed("shutdown");
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
SetFaulted(
WorkerClientErrorCode.ShutdownTimeout,
"Worker shutdown timed out.",
null);
throw new WorkerClientException(
WorkerClientErrorCode.ShutdownTimeout,
$"Worker shutdown timed out after {timeout}.");
}
}
public void Kill(string reason)
{
ThrowIfDisposed();
_connection.ProcessHandle?.Process.Kill(entireProcessTree: true);
_metrics?.WorkerKilled(reason);
SetFaulted(
WorkerClientErrorCode.WorkerFaulted,
$"Worker was killed by the gateway: {reason}.",
null);
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete();
_events.Writer.TryComplete();
CompletePendingCommands(
new WorkerClientException(
WorkerClientErrorCode.GatewayShutdown,
"Worker client was disposed."));
await WaitForBackgroundTasksAsync(CancellationToken.None).ConfigureAwait(false);
await _connection.Stream.DisposeAsync().ConfigureAwait(false);
_connection.ProcessHandle?.Dispose();
_stopCts.Dispose();
}
private async Task WriteLoopAsync()
{
try
{
await foreach (WorkerEnvelope envelope in _outboundEnvelopes.Reader.ReadAllAsync(_stopCts.Token).ConfigureAwait(false))
{
await _writer.WriteAsync(envelope, _stopCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (_stopCts.IsCancellationRequested || IsTerminalState())
{
}
catch (Exception exception)
{
SetFaulted(
WorkerClientErrorCode.WriteFailed,
"Worker pipe write failed.",
exception);
}
}
private async Task ReadLoopAsync()
{
try
{
while (!_stopCts.IsCancellationRequested)
{
WorkerEnvelope envelope = await _reader.ReadAsync(_stopCts.Token).ConfigureAwait(false);
await DispatchEnvelopeAsync(envelope, _stopCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (_stopCts.IsCancellationRequested || IsTerminalState())
{
}
catch (WorkerFrameProtocolException exception) when (exception.ErrorCode == WorkerFrameProtocolErrorCode.EndOfStream)
{
SetFaulted(
WorkerClientErrorCode.PipeDisconnected,
"Worker pipe disconnected.",
exception);
}
catch (Exception exception)
{
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
"Worker read loop failed.",
exception);
}
}
private async Task HeartbeatLoopAsync()
{
try
{
while (!_stopCts.IsCancellationRequested)
{
await Task.Delay(_options.HeartbeatCheckInterval, _stopCts.Token).ConfigureAwait(false);
if (State != WorkerClientState.Ready)
{
continue;
}
DateTimeOffset lastHeartbeatAt = LastHeartbeatAt;
DateTimeOffset now = _timeProvider.GetUtcNow();
if (now - lastHeartbeatAt <= _options.HeartbeatGrace)
{
continue;
}
_metrics?.HeartbeatFailed(SessionId);
SetFaulted(
WorkerClientErrorCode.HeartbeatExpired,
$"Worker heartbeat expired. Last heartbeat was at {lastHeartbeatAt:O}.",
null);
}
}
catch (OperationCanceledException) when (_stopCts.IsCancellationRequested || IsTerminalState())
{
}
}
private async Task DispatchEnvelopeAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
switch (envelope.BodyCase)
{
case WorkerEnvelope.BodyOneofCase.WorkerCommandReply:
CompleteCommand(envelope);
break;
case WorkerEnvelope.BodyOneofCase.WorkerEvent:
await EnqueueWorkerEventAsync(envelope.WorkerEvent, cancellationToken).ConfigureAwait(false);
break;
case WorkerEnvelope.BodyOneofCase.WorkerHeartbeat:
MarkHeartbeat(envelope.WorkerHeartbeat);
break;
case WorkerEnvelope.BodyOneofCase.WorkerFault:
SetFaulted(
WorkerClientErrorCode.WorkerFaulted,
CreateWorkerFaultMessage(envelope.WorkerFault),
null);
break;
case WorkerEnvelope.BodyOneofCase.WorkerShutdownAck:
MarkClosed("worker-shutdown-ack");
break;
default:
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
$"Worker sent unexpected envelope body {envelope.BodyCase}.",
null);
break;
}
}
private async Task EnqueueWorkerEventAsync(
WorkerEvent workerEvent,
CancellationToken cancellationToken)
{
if (workerEvent.Event is not null)
{
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
}
if (!_events.Writer.TryWrite(workerEvent))
{
_metrics?.QueueOverflow("worker-events");
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
"Worker event channel rejected an event.",
null);
return;
}
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
_metrics?.SetEventQueueDepth(queueDepth);
}
private void CompleteCommand(WorkerEnvelope envelope)
{
string correlationId = envelope.CorrelationId;
if (string.IsNullOrWhiteSpace(correlationId))
{
correlationId = envelope.WorkerCommandReply.Reply?.CorrelationId ?? string.Empty;
}
if (!_pendingCommands.TryRemove(correlationId, out PendingCommand? pendingCommand))
{
_logger.LogDebug(
"Ignoring late or unknown worker command reply for session {SessionId} and correlation {CorrelationId}.",
SessionId,
correlationId);
return;
}
TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp);
_metrics?.CommandSucceeded(pendingCommand.Method, duration);
pendingCommand.SetResult(envelope.WorkerCommandReply);
}
private void RemovePendingCommandAsFailed(
string correlationId,
PendingCommand pendingCommand,
WorkerClientErrorCode errorCode,
string message)
{
if (!_pendingCommands.TryRemove(correlationId, out _))
{
return;
}
TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp);
_metrics?.CommandFailed(pendingCommand.Method, errorCode.ToString(), duration);
pendingCommand.SetException(new WorkerClientException(errorCode, message));
}
private async Task<WorkerEnvelope> ReadHandshakeEnvelopeAsync(
WorkerEnvelope.BodyOneofCase expectedBody,
CancellationToken cancellationToken)
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase != expectedBody)
{
throw new WorkerClientException(
WorkerClientErrorCode.ProtocolViolation,
$"Worker handshake expected {expectedBody} but received {envelope.BodyCase}.");
}
return envelope;
}
private void ValidateWorkerHello(WorkerHello workerHello)
{
if (workerHello.ProtocolVersion != _connection.FrameOptions.ProtocolVersion)
{
throw new WorkerClientException(
WorkerClientErrorCode.ProtocolViolation,
"Worker hello protocol version does not match the gateway protocol version.");
}
if (!string.Equals(workerHello.Nonce, _connection.Nonce, StringComparison.Ordinal))
{
throw new WorkerClientException(
WorkerClientErrorCode.ProtocolViolation,
"Worker hello nonce does not match the gateway nonce.");
}
lock (_syncRoot)
{
_processId = workerHello.WorkerProcessId == 0
? _connection.ProcessHandle?.ProcessId
: workerHello.WorkerProcessId;
}
}
private void MarkReady(WorkerReady ready)
{
lock (_syncRoot)
{
_processId = ready.WorkerProcessId == 0
? _processId ?? _connection.ProcessHandle?.ProcessId
: ready.WorkerProcessId;
_lastHeartbeatAt = _timeProvider.GetUtcNow();
_state = WorkerClientState.Ready;
}
DateTimeOffset readyAt = _timeProvider.GetUtcNow();
DateTimeOffset launchedAt = _connection.ProcessHandle?.LaunchedAt ?? readyAt;
_metrics?.WorkerStarted(readyAt - launchedAt);
}
private void MarkHeartbeat(WorkerHeartbeat heartbeat)
{
lock (_syncRoot)
{
_lastHeartbeatAt = _timeProvider.GetUtcNow();
if (heartbeat.WorkerProcessId != 0)
{
_processId = heartbeat.WorkerProcessId;
}
}
}
private void MarkClosing()
{
lock (_syncRoot)
{
if (_state is WorkerClientState.Closed or WorkerClientState.Faulted)
{
return;
}
_state = WorkerClientState.Closing;
}
}
private void MarkClosed(string reason)
{
lock (_syncRoot)
{
if (_state == WorkerClientState.Closed)
{
return;
}
_state = WorkerClientState.Closed;
}
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete();
_events.Writer.TryComplete();
CompletePendingCommands(
new WorkerClientException(
WorkerClientErrorCode.GatewayShutdown,
$"Worker client closed because {reason}."));
_metrics?.WorkerStopped(reason);
}
private void SetFaulted(
WorkerClientErrorCode errorCode,
string message,
Exception? exception)
{
WorkerClientException fault = exception is null
? new WorkerClientException(errorCode, message)
: new WorkerClientException(errorCode, message, exception);
lock (_syncRoot)
{
if (_state is WorkerClientState.Faulted or WorkerClientState.Closed)
{
return;
}
_state = WorkerClientState.Faulted;
}
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete(fault);
_events.Writer.TryComplete(fault);
CompletePendingCommands(fault);
_metrics?.Fault(errorCode.ToString());
_logger.LogWarning(exception, "Worker client faulted for session {SessionId}: {Message}", SessionId, message);
}
private void CompletePendingCommands(Exception exception)
{
foreach (KeyValuePair<string, PendingCommand> item in _pendingCommands.ToArray())
{
if (_pendingCommands.TryRemove(item.Key, out PendingCommand? pendingCommand))
{
TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp);
_metrics?.CommandFailed(pendingCommand.Method, exception.GetType().Name, duration);
pendingCommand.SetException(exception);
}
}
}
private void TransitionFromCreatedToHandshaking()
{
lock (_syncRoot)
{
if (_state != WorkerClientState.Created)
{
throw new WorkerClientException(
WorkerClientErrorCode.InvalidState,
$"Worker client cannot start from state {_state}.");
}
_state = WorkerClientState.Handshaking;
}
}
private void EnsureReady()
{
WorkerClientState state = State;
if (state != WorkerClientState.Ready)
{
throw new WorkerClientException(
WorkerClientErrorCode.InvalidState,
$"Worker client is not ready. Current state is {state}.");
}
}
private bool IsTerminalState()
{
WorkerClientState state = State;
return state is WorkerClientState.Closing or WorkerClientState.Closed or WorkerClientState.Faulted;
}
private async Task EnqueueAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
try
{
await _outboundEnvelopes.Writer.WriteAsync(envelope, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException exception)
{
throw new WorkerClientException(
WorkerClientErrorCode.WriteFailed,
"Worker outbound channel is closed.",
exception);
}
}
private WorkerEnvelope CreateGatewayHelloEnvelope()
{
return CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.GatewayHello = new GatewayHello
{
SupportedProtocolVersion = _connection.FrameOptions.ProtocolVersion,
Nonce = _connection.Nonce,
GatewayVersion = typeof(GatewayContractInfo).Assembly.GetName().Version?.ToString() ?? GatewayVersionFallback,
});
}
private WorkerEnvelope CreateCommandEnvelope(
string correlationId,
WorkerCommand command)
{
return CreateEnvelope(
correlationId,
envelope => envelope.WorkerCommand = command.Clone());
}
private WorkerEnvelope CreateShutdownEnvelope(
TimeSpan timeout,
string reason)
{
return CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(timeout),
Reason = reason,
});
}
private WorkerEnvelope CreateEnvelope(
string correlationId,
Action<WorkerEnvelope> setBody)
{
WorkerEnvelope envelope = new()
{
ProtocolVersion = _connection.FrameOptions.ProtocolVersion,
SessionId = SessionId,
Sequence = (ulong)Interlocked.Increment(ref _nextSequence),
CorrelationId = correlationId,
};
setBody(envelope);
return envelope;
}
private static string GetCommandMethod(WorkerCommand command)
{
return command.Command?.Kind.ToString() ?? MxCommandKind.Unspecified.ToString();
}
private static string CreateWorkerFaultMessage(WorkerFault fault)
{
return string.IsNullOrWhiteSpace(fault.DiagnosticMessage)
? $"Worker faulted with category {fault.Category}."
: $"Worker faulted with category {fault.Category}: {fault.DiagnosticMessage}";
}
private async Task WaitForBackgroundTasksAsync(CancellationToken cancellationToken)
{
Task[] tasks = new[] { _readLoopTask, _writeLoopTask, _heartbeatLoopTask }
.Where(task => task is not null)
.Cast<Task>()
.ToArray();
if (tasks.Length == 0)
{
return;
}
await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
private sealed class PendingCommand
{
private readonly TaskCompletionSource<WorkerCommandReply> _completion = new(TaskCreationOptions.RunContinuationsAsynchronously);
public PendingCommand(
string correlationId,
string method,
long startTimestamp)
{
CorrelationId = correlationId;
Method = method;
StartTimestamp = startTimestamp;
}
public string CorrelationId { get; }
public string Method { get; }
public long StartTimestamp { get; }
public Task<WorkerCommandReply> Task => _completion.Task;
public void SetResult(WorkerCommandReply reply)
{
_completion.TrySetResult(reply);
}
public void SetException(Exception exception)
{
_completion.TrySetException(exception);
}
}
}
@@ -0,0 +1,38 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerClientConnection
{
public WorkerClientConnection(
string sessionId,
string nonce,
Stream stream,
WorkerFrameProtocolOptions frameOptions,
WorkerProcessHandle? processHandle = null)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Worker nonce is required.", nameof(nonce));
}
SessionId = sessionId;
Nonce = nonce;
Stream = stream ?? throw new ArgumentNullException(nameof(stream));
FrameOptions = frameOptions ?? throw new ArgumentNullException(nameof(frameOptions));
ProcessHandle = processHandle;
}
public string SessionId { get; }
public string Nonce { get; }
public Stream Stream { get; }
public WorkerFrameProtocolOptions FrameOptions { get; }
public WorkerProcessHandle? ProcessHandle { get; }
}
@@ -0,0 +1,14 @@
namespace MxGateway.Server.Workers;
public enum WorkerClientErrorCode
{
InvalidState,
ProtocolViolation,
PipeDisconnected,
CommandTimeout,
WorkerFaulted,
HeartbeatExpired,
ShutdownTimeout,
GatewayShutdown,
WriteFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerClientException : Exception
{
public WorkerClientException(
WorkerClientErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerClientException(
WorkerClientErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerClientErrorCode ErrorCode { get; }
}
@@ -0,0 +1,24 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerClientOptions
{
public static readonly TimeSpan DefaultHeartbeatGrace = TimeSpan.FromSeconds(15);
public static readonly TimeSpan DefaultHeartbeatCheckInterval = TimeSpan.FromSeconds(1);
public static readonly TimeSpan DefaultEventChannelFullModeTimeout = TimeSpan.FromSeconds(5);
public WorkerClientOptions()
{
HeartbeatGrace = DefaultHeartbeatGrace;
HeartbeatCheckInterval = DefaultHeartbeatCheckInterval;
EventChannelCapacity = 1_024;
EventChannelFullModeTimeout = DefaultEventChannelFullModeTimeout;
}
public TimeSpan HeartbeatGrace { get; init; }
public TimeSpan HeartbeatCheckInterval { get; init; }
public int EventChannelCapacity { get; init; }
public TimeSpan EventChannelFullModeTimeout { get; init; }
}
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Workers;
public enum WorkerClientState
{
Created,
Handshaking,
Ready,
Closing,
Closed,
Faulted,
}

Some files were not shown because too many files have changed in this diff Show More