Compare commits

...

34 Commits

Author SHA1 Message Date
Joseph Doherty 451dccf7e3 Issue #24: create mxaccess com object on sta 2026-04-26 17:34:12 -04:00
dohertj2 5511609880 Merge PR #68: Issue #12 implement session manager and registry
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:34:19 -04:00
dohertj2 cde9c89386 Merge PR #67: Issue #30 implement value conversion
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:30:19 -04:00
Joseph Doherty d496f1fd75 Issue #12: implement session manager and registry 2026-04-26 17:29:47 -04:00
Joseph Doherty 6559672fc1 Issue #30: implement value conversion 2026-04-26 17:26:36 -04:00
dohertj2 97c30b9d00 Merge PR #66: Issue #23 implement STA runtime and message pump
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:23:02 -04:00
dohertj2 603aff7004 Merge PR #65: Issue #22 implement pipe client and frame protocol
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:20:28 -04:00
Joseph Doherty e81682e367 Issue #23: implement sta runtime and message pump 2026-04-26 17:19:00 -04:00
Joseph Doherty d5a982152b Issue #22: implement pipe client and frame protocol 2026-04-26 17:16:49 -04:00
dohertj2 0b0be7098e Merge PR #64: Issue #11 implement gateway WorkerClient
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:14:03 -04:00
Joseph Doherty fce9e99553 Issue #11: implement gateway workerclient 2026-04-26 17:09:51 -04:00
dohertj2 c8fb3e91a3 Merge PR #63: Issue #8 add gRPC authentication and scope authorization
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:06:23 -04:00
dohertj2 8ce327e6f4 Merge PR #62: Issue #7 implement local api key admin cli
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:02:09 -04:00
Joseph Doherty fad0ac9948 Issue #8: add grpc authentication and scope authorization 2026-04-26 17:01:59 -04:00
dohertj2 9cb2f1c5cd Merge PR #61: Issue #21 implement worker bootstrap and options
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:56:52 -04:00
Joseph Doherty da9ffe0e11 Issue #7: implement local api key admin cli 2026-04-26 16:56:12 -04:00
Joseph Doherty 0af1427859 Issue #21: implement worker bootstrap and options 2026-04-26 16:53:06 -04:00
dohertj2 e2b4dfcb32 Merge PR #60: Issue #10 implement worker process launcher
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:50:00 -04:00
dohertj2 3b3e41acf4 Merge PR #59: Issue #6 implement API key hashing and verification
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:46:06 -04:00
Joseph Doherty c1188c6957 Issue #10: implement worker process launcher 2026-04-26 16:45:42 -04:00
dohertj2 4094e64ee0 Merge PR #58: Issue #20 scaffold worker project
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:41:34 -04:00
Joseph Doherty 696be17139 Issue #6: implement api key hashing and verification 2026-04-26 16:40:46 -04:00
Joseph Doherty b42c3c8b3b Issue #20: scaffold worker project 2026-04-26 16:37:23 -04:00
dohertj2 420a813967 Merge PR #56: Issue #5 implement SQLite auth store and migrations
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:34:28 -04:00
Joseph Doherty ec1155de6d Issue #5: implement sqlite auth store and migrations 2026-04-26 16:29:38 -04:00
dohertj2 0c539834dc Merge PR #55: Issue #9 implement worker frame protocol
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:24:38 -04:00
Joseph Doherty a5098e6815 Issue #9: implement worker frame protocol 2026-04-26 16:20:59 -04:00
dohertj2 41ddd122a6 Merge PR #53: Issue #4 add structured logging and metrics foundation
Verified after merging origin/main and resolving startup/test conflicts with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:19:24 -04:00
Joseph Doherty a25f09e795 Merge remote-tracking branch 'origin/main' into agent-3/issue-4-add-structured-logging-and-metrics-foundation
# Conflicts:
#	src/MxGateway.Server/GatewayApplication.cs
#	src/MxGateway.Tests/Gateway/GatewayApplicationTests.cs
2026-04-26 16:17:22 -04:00
dohertj2 37da9d8f44 Merge PR #54: Issue #3 add gateway configuration and validation
Verified after merging origin/main with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:17:20 -04:00
Joseph Doherty a19af5f7cb Merge remote-tracking branch 'origin/main' into agent-2/issue-3-add-gateway-configuration-and-validation 2026-04-26 16:14:23 -04:00
dohertj2 03ab36c4d5 Merge PR #52: Issue #2 define protobuf contracts
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:14:07 -04:00
Joseph Doherty 91ea71b0b7 Issue #3: add gateway configuration and validation 2026-04-26 16:11:30 -04:00
Joseph Doherty 7dfec6dc8c Issue #4: add structured logging and metrics foundation 2026-04-26 16:10:58 -04:00
196 changed files with 12172 additions and 9 deletions
+54
View File
@@ -0,0 +1,54 @@
# Worker Frame Protocol
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
messages over a bidirectional named pipe. The frame layer is deliberately small:
it handles message boundaries, size limits, protobuf parsing, and envelope
validation before higher-level worker client code routes commands, replies,
events, and faults.
## Frame Format
Each frame starts with a four-byte little-endian unsigned payload length,
followed by the serialized `WorkerEnvelope` payload:
```text
uint32 little-endian payload_length
payload_length bytes protobuf WorkerEnvelope
```
The reader rejects zero-length payloads and payloads larger than the configured
maximum before allocating the payload buffer. The default maximum is 16 MiB,
matching the gateway process design.
## Envelope Validation
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
owning session before returning or writing it:
- `protocol_version` must match the configured worker protocol version,
- `session_id` must match the owning gateway session,
- the envelope must contain one typed `body` value.
Protocol violations throw `WorkerFrameProtocolException` with a
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
oversized frames, protocol version mismatches, and session mismatches.
## Verification
Run the focused tests after changing the frame protocol:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
```
Run the gateway build because the frame protocol is part of
`MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Protobuf Contracts](./Contracts.md)
+62
View File
@@ -0,0 +1,62 @@
# Worker Process Launcher
The gateway uses `WorkerProcessLauncher` to validate and start one worker
process for a gateway session. The launcher owns process start semantics only;
pipe handshaking and `WorkerReady` validation remain part of the worker client
startup path.
## Launch Inputs
`WorkerProcessLaunchRequest` carries the per-session bootstrap values:
- `SessionId`,
- `PipeName`,
- `ProtocolVersion`,
- `Nonce`,
- optional `PipeReservation` cleanup handle.
The launcher passes `SessionId`, `PipeName`, and `ProtocolVersion` as command
line arguments:
```text
--session-id <sessionId> --pipe-name <pipeName> --protocol-version <version>
```
The launcher sets the nonce through the `MXGATEWAY_WORKER_NONCE` environment
variable. The nonce is not included in `WorkerProcessCommandLine` so logs and
diagnostics can report the launch command without exposing the secret.
## Validation And Cleanup
Before starting the process, the launcher validates that the configured worker
path exists, has a `.exe` extension, contains a valid Windows Portable
Executable header, and matches the configured `RequiredArchitecture`.
After the process starts, `IWorkerStartupProbe` waits for startup readiness.
The default probe only verifies that the worker did not exit immediately. The
worker client replaces this probe when pipe connection, hello, and
`WorkerReady` handling are implemented.
If startup fails or exceeds `WorkerOptions.StartupTimeoutSeconds`, the launcher
kills the worker process tree, disposes the process handle, disposes the
optional pipe reservation, records a worker kill metric, and reports a
`WorkerProcessLaunchException`.
## Verification
Run the focused launcher tests after changing process launch behavior:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerProcessLauncherTests
```
Run the gateway build because the launcher is part of `MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
+6 -1
View File
@@ -105,6 +105,12 @@ Do not let Razor components directly mutate gateway session or worker objects.
Create a small read-only dashboard service that projects gateway state into
plain DTOs.
`GatewayMetrics.GetSnapshot()` is the metrics input for the first dashboard
projection. It carries current session and worker gauges, command and event
counters, queue depth, and fault totals. The dashboard reads that snapshot
instead of reading raw `Meter` instruments because exporter configuration is an
operations concern, not a UI dependency.
Suggested service:
```csharp
@@ -361,4 +367,3 @@ The first dashboard slice should implement:
8. workers page with worker table.
9. 1-second realtime refresh through Blazor Server.
10. redaction tests for secrets.
+137 -2
View File
@@ -330,6 +330,20 @@ The worker remains authoritative for MXAccess handles. The gateway may keep a
shadow state for diagnostics, but it must not invent, rewrite, or recycle
MXAccess handles.
`SessionManager` owns the current in-memory session registry. It allocates a
session id, creates the worker pipe name and nonce, registers the session before
worker startup, and removes the session if startup fails. A successful
`OpenSession` attaches the ready `IWorkerClient` and transitions the session to
`Ready`.
Only `Ready` sessions accept command and event operations. `CloseSession` is
idempotent for sessions still known to the registry: the first close shuts down
the worker, and later closes return the final `Closed` state. Lease handling is
exposed as a session hook so a monitor can close expired sessions without
embedding lease policy in the worker client. Gateway shutdown walks the
registry, closes each known session, and kills a worker if graceful shutdown
fails.
## Worker Launch
The gateway should launch the worker using explicit configuration:
@@ -360,6 +374,15 @@ Before launch, validate:
- worker file version or product version is acceptable,
- worker is expected to be x86.
`WorkerProcessLauncher` implements the first validation layer now: it resolves
the worker executable path, requires a `.exe`, validates the Windows Portable
Executable header, and verifies the configured processor architecture. It passes
only `--session-id`, `--pipe-name`, and `--protocol-version` on the command
line. The per-session nonce is set through `MXGATEWAY_WORKER_NONCE` so the
command line remains safe to log. Startup failures and startup timeouts kill and
dispose the worker process and the pre-created pipe reservation before the
session manager observes the failure.
## Worker IPC
The gateway creates the pipe server before launching the worker.
@@ -402,7 +425,7 @@ session ids as protocol faults and close the session.
`WorkerClient` is the gateway-side object that owns one worker connection.
Suggested public shape:
Current public shape:
```csharp
public interface IWorkerClient : IAsyncDisposable
@@ -410,6 +433,7 @@ public interface IWorkerClient : IAsyncDisposable
string SessionId { get; }
int? ProcessId { get; }
WorkerClientState State { get; }
DateTimeOffset LastHeartbeatAt { get; }
Task StartAsync(CancellationToken cancellationToken);
Task<WorkerCommandReply> InvokeAsync(
@@ -429,12 +453,17 @@ Internally it owns:
- pipe stream,
- read loop,
- write loop,
- bounded outbound command/control channel,
- outbound command/control channel serialized by the write loop,
- bounded inbound event channel,
- pending command dictionary keyed by correlation id,
- heartbeat monitor,
- terminal fault source.
`StartAsync` sends `GatewayHello`, verifies the `WorkerHello` protocol version
and nonce, waits for `WorkerReady`, and only then exposes `Ready` state. The
read loop starts after readiness so the handshake has a single owner for its
ordered frames.
### Read Loop
The read loop:
@@ -589,6 +618,29 @@ The gateway should split the key into a stable key id and secret component,
load the key record by id, hash the presented secret, and compare using a
constant-time comparison.
`ApiKeyParser` accepts only `authorization: Bearer mxgw_<key-id>_<secret>`.
Malformed headers fail before any database lookup. The parsed raw secret is
kept only long enough for `ApiKeySecretHasher` to compute an HMAC-SHA256 hash
using the configured `Authentication:PepperSecretName` lookup in application
configuration. The raw secret is not stored in the auth database, identity
model, logs, or verification result.
`ApiKeyVerifier` loads the stored key record by key id, rejects revoked keys,
hashes the presented secret, and compares the stored and presented hashes with
`CryptographicOperations.FixedTimeEquals`. A successful verification returns an
`ApiKeyIdentity` with key id, key prefix, display name, and scopes. Failure
results distinguish malformed credentials, missing keys, revoked keys, missing
pepper configuration, and hash mismatch for internal authorization handling.
`GatewayGrpcAuthorizationInterceptor` enforces this authentication model for
public gRPC calls. Missing, malformed, revoked, unknown, or mismatched keys fail
with `Unauthenticated`. Authenticated calls missing the scope required by the
RPC fail with `PermissionDenied`. The interceptor applies to unary calls and
server-streaming calls and stores the authenticated `ApiKeyIdentity` in
`IGatewayRequestIdentityAccessor` for the duration of the request handler.
`Authentication:Mode` set to `Disabled` bypasses API-key verification for local
development only.
Recommended scopes:
- `session:open`
@@ -608,10 +660,44 @@ gRPC admin API. It should initialize the auth database, create keys, list keys
without secrets, revoke keys, rotate keys, and print raw secrets only once at
creation.
`MxGateway.Server` exposes local API-key administration as an `apikey`
subcommand before the web host starts:
```bash
MxGateway.Server apikey init-db --sqlite-path C:\ProgramData\MxGateway\gateway-auth.db
MxGateway.Server apikey create-key --key-id operator01 --display-name Operator --scopes session:open,events:read
MxGateway.Server apikey list-keys --json
MxGateway.Server apikey revoke-key --key-id operator01
MxGateway.Server apikey rotate-key --key-id operator01 --json
```
The subcommands accept `--sqlite-path`, `--pepper`, and `--json`. `--pepper`
sets the local `MxGateway:ApiKeyPepper` configuration value for the command
process; deployments should normally provide the pepper through the configured
secret source. `create-key` and `rotate-key` print the full raw API key exactly
once. `list-keys` never prints raw secrets or `secret_hash` values.
SQLite auth storage should use startup migrations with a `schema_version` table.
Migrations should run inside transactions and fail startup if the database
schema is newer than the running binary understands.
The v1 auth store uses `Microsoft.Data.Sqlite` and creates the
`schema_version`, `api_keys`, and `api_key_audit` tables through
`SqliteAuthStoreMigrator`. `AuthStoreMigrationHostedService` runs those
migrations at gateway startup when API-key authentication and
`Authentication:RunMigrationsOnStartup` are enabled. A database with a newer
schema version fails startup instead of being modified by an older gateway
binary.
`IApiKeyStore` reads stored key records and exposes an active-key lookup that
excludes rows with `revoked_utc` set. Hash verification belongs to the API-key
hashing layer, but the store preserves the `secret_hash` bytes, display name,
scopes, timestamps, and revocation state needed by that layer.
`IApiKeyAuditStore` appends audit events to `api_key_audit` and returns recent
events for diagnostics and future administrative tools. Audit records store key
ids and event metadata only; they do not store raw API key secrets.
Commands requiring authorization:
- writes,
@@ -620,6 +706,20 @@ Commands requiring authorization:
- worker shutdown diagnostics,
- metadata queries if they expose sensitive plant structure.
Current gRPC scope mapping:
- `OpenSession` requires `session:open`.
- `CloseSession` requires `session:close`.
- `StreamEvents` and `DrainEvents` require `events:read`.
- read-style MXAccess commands such as `Register`, `AddItem`, `Advise`, and
`Ping` require `invoke:read`.
- `Write` and `Write2` require `invoke:write`.
- `WriteSecured`, `WriteSecured2`, and `AuthenticateUser` require
`invoke:secure`.
- metadata commands such as `ArchestrAUserToId`, `GetSessionState`, and
`GetWorkerInfo` require `metadata:read`.
- `ShutdownWorker` requires `admin`.
### Worker IPC
Named pipes should be local only. Pipe ACLs should restrict access to:
@@ -664,6 +764,26 @@ Metrics:
Do not log credential values or full tag values by default.
The gateway registers `GatewayMetrics` as the in-process metrics foundation.
It emits .NET `Meter` instruments for collectors and keeps a
`GatewayMetricsSnapshot` for dashboard projection. The snapshot exists because
the dashboard needs current counters and queue depths without depending on a
specific metrics exporter.
HTTP request handling uses `UseGatewayRequestLoggingScope()` to attach common
structured log fields when request metadata is present:
- `SessionId`,
- `ClientIdentity`,
- `WorkerProcessId`,
- `CorrelationId`,
- `CommandMethod`.
`GatewayLogRedactor` redacts API key secrets and command values before they are
added to log state. Value logging remains opt-in and redacted by default so
secured writes, authentication commands, and ordinary tag values do not leak
through diagnostics.
## Configuration
Suggested configuration shape:
@@ -710,6 +830,18 @@ Suggested configuration shape:
Do not scatter connection or path constants through implementation code.
`MxGateway.Server` binds this section to `GatewayOptions` at startup and
registers validation with `ValidateOnStart()`. Startup fails before the gateway
begins serving traffic when required authentication settings are missing,
timeouts or queue sizes are not positive, dashboard settings are malformed, or
the configured worker protocol version does not match the contract version.
The gateway exposes read-only effective settings through
`IGatewayConfigurationProvider`. This projection is for dashboard settings and
diagnostics, so it redacts secret-related fields such as
`Authentication:PepperSecretName` and does not include raw API keys or key
material.
## Galaxy Repository Metadata
Galaxy hierarchy and tag metadata can be discovered through SQL Server when
@@ -730,6 +862,9 @@ workers and fake transports.
Focused tests:
- session state transitions,
- gRPC API-key authentication for unary and streaming calls,
- gRPC scope mapping for sessions, invokes, events, metadata, and admin
commands,
- worker startup failures,
- protocol version mismatch,
- malformed frame handling,
+65
View File
@@ -26,6 +26,33 @@ Style guides:
- [C# Style Guide](./style-guides/CSharpStyleGuide.md)
- [Protobuf Style Guide](./style-guides/ProtobufStyleGuide.md)
## Build And Test
Build the SDK-style worker project with the .NET SDK MSBuild entry point. The
project targets .NET Framework 4.8, but the SDK resolver comes from the .NET SDK
installation:
```powershell
dotnet msbuild src\MxGateway.Worker\MxGateway.Worker.csproj /restore /p:Configuration=Debug /p:Platform=x86
```
`docs/toolchain-links.md` records the Visual Studio MSBuild executable for
classic .NET Framework and COM interop builds:
```powershell
& "C:\Program Files (x86)\Microsoft Visual Studio\2022\BuildTools\MSBuild\Current\Bin\MSBuild.exe" src\MxGateway.Worker\MxGateway.Worker.csproj /p:Configuration=Debug /p:Platform=x86
```
Run the worker tests with the same platform target:
```powershell
dotnet test src\MxGateway.Worker.Tests\MxGateway.Worker.Tests.csproj -p:Platform=x86
```
The only MXAccess interop reference belongs in `MxGateway.Worker`. Gateway and
test projects may reference the worker project for metadata and scaffold tests,
but they must not reference `ArchestrA.MXAccess.dll` directly.
## Responsibilities
The worker owns:
@@ -87,6 +114,21 @@ Startup sequence:
If validation fails before MXAccess creation, exit quickly with a non-zero exit
code. If MXAccess creation fails, send `WorkerFault` when possible and exit.
The bootstrap layer returns structured exit codes before it creates pipes,
starts the STA, or touches MXAccess:
| Exit code | Name | Meaning |
|-----------|------|---------|
| `0` | `Success` | Required bootstrap options are valid. |
| `1` | `UnexpectedFailure` | A non-bootstrap exception reaches the process boundary. |
| `2` | `InvalidArguments` | Required arguments are missing or unknown arguments are present. |
| `3` | `InvalidProtocolVersion` | `--protocol-version` is not numeric or does not match the supported worker protocol. |
| `4` | `MissingNonce` | `MXGATEWAY_WORKER_NONCE` is absent or empty. |
Bootstrap logs use `WorkerConsoleLogger` key/value output. `WorkerLogRedactor`
redacts fields whose names indicate nonce, secret, password, token,
credential, or API key values before the message is written.
## Internal Components
```text
@@ -208,6 +250,17 @@ The loop should update a heartbeat timestamp after:
- finishing a command,
- processing an MXAccess event.
`StaRuntime` implements this runtime boundary in the worker. It starts one
background thread named `MxGateway.Worker.STA`, sets it to `ApartmentState.STA`,
initializes COM through `StaComApartmentInitializer`, and runs
`StaMessagePump`. Commands are scheduled through `InvokeAsync`; the command
queue signals an `AutoResetEvent` so `MsgWaitForMultipleObjectsEx` can wake the
STA without busy-waiting. `LastActivityUtc` records pump, command, startup, and
shutdown activity so the future heartbeat/watchdog can report whether the STA
is still responsive. Shutdown marks the runtime as closing, wakes the pump,
rejects new commands, cancels queued work, uninitializes COM on the STA, and
waits for the thread to exit.
## COM Creation
The MXAccess analysis source at `C:\Users\dohertj2\Desktop\mxaccess` identifies
@@ -236,6 +289,13 @@ The worker should reference the interop assembly and instantiate
`LMXProxyServerClass` on the dedicated STA thread. Keep the ProgID and assembly
path configurable for diagnostics, but this COM class is the v1 default.
`MxAccessStaSession` owns the initial COM creation path. It starts `StaRuntime`,
creates `LMXProxyServerClass` through `MxAccessComObjectFactory` on the STA,
attaches `MxAccessBaseEventSink`, and returns `WorkerReady` only after those
steps succeed. `MxAccessSession` keeps the raw COM object private, records the
STA managed thread id that created it, detaches the base event sink during
disposal, and releases the COM reference on the STA.
Creation rules:
- Create COM object only on the STA.
@@ -253,6 +313,11 @@ If COM creation fails, the worker should send a structured fault with:
- worker process id,
- session id.
`WorkerPipeSession` maps startup exceptions from this path to
`WorkerFaultCategory.MxaccessCreationFailed`, includes the captured HRESULT
when the exception exposes one, and does not send `WorkerReady` after a failed
COM creation attempt.
## Event Sink
The worker must subscribe to every public MXAccess event family:
+24 -3
View File
@@ -45,6 +45,10 @@ Detailed follow-up docs:
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
session manager, worker supervision, gRPC API, event streaming, fault model,
security, observability, and test strategy.
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
reader/writer and `WorkerEnvelope` validation rules.
- `docs/WorkerProcessLauncher.md` covers worker executable validation, process
launch arguments, nonce handling, and startup cleanup behavior.
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
MXAccess worker instance, including STA ownership, message pumping, COM
lifetime, command dispatch, event sinks, conversion, and shutdown.
@@ -97,6 +101,13 @@ Responsibilities:
The gateway must never instantiate or call MXAccess directly.
The gateway observability foundation lives in `MxGateway.Server.Diagnostics`
and `MxGateway.Server.Metrics`. Structured logging scopes carry session,
worker, correlation, command, and client identity fields with redaction applied
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
histograms through .NET `Meter` and a snapshot API that dashboard services can
project without binding to a metrics exporter.
### Worker Process
Runtime:
@@ -555,9 +566,13 @@ Because each client owns one worker, a crash or leak affects only that session.
External gateway:
- use TLS for remote gRPC if crossing machine boundaries,
- authenticate clients with Windows auth, mTLS, or a deployment-specific token,
- authorize access to commands that can write, authenticate users, or alter
runtime state.
- authenticate v1 gRPC clients with `authorization: Bearer
mxgw_<key-id>_<secret>` API-key metadata,
- reject missing or invalid API keys with gRPC `Unauthenticated`,
- reject valid keys that lack the required session, invoke, event, metadata, or
admin scope with gRPC `PermissionDenied`,
- authorize access to commands that can write, authenticate users, expose
metadata, stream events, or alter runtime state.
Internal worker IPC:
@@ -784,6 +799,12 @@ Core operations:
- track worker state,
- close or kill worker.
The gateway implementation keeps sessions in an in-memory `SessionRegistry`
keyed by session id. `SessionManager` owns the state machine, creates
per-session pipe names and nonces, starts the worker through the worker-client
factory, gates commands to `Ready` sessions, exposes lease-close hooks, and
cleans up workers during gateway shutdown.
State machine:
```text
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<TargetFrameworks>net10.0;net48</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
@@ -17,6 +17,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="6.1.2" />
</ItemGroup>
</Project>
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Configuration;
public enum AuthenticationMode
{
ApiKey,
Disabled
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Configuration;
public sealed class AuthenticationOptions
{
public AuthenticationMode Mode { get; init; } = AuthenticationMode.ApiKey;
public string SqlitePath { get; init; } = @"C:\ProgramData\MxGateway\gateway-auth.db";
public string PepperSecretName { get; init; } = "MxGateway:ApiKeyPepper";
public bool RunMigrationsOnStartup { get; init; } = true;
}
@@ -0,0 +1,20 @@
namespace MxGateway.Server.Configuration;
public sealed class DashboardOptions
{
public bool Enabled { get; init; } = true;
public string PathBase { get; init; } = "/dashboard";
public bool RequireAdminScope { get; init; } = true;
public bool AllowAnonymousLocalhost { get; init; }
public int SnapshotIntervalMilliseconds { get; init; } = 1_000;
public int RecentFaultLimit { get; init; } = 100;
public int RecentSessionLimit { get; init; } = 200;
public bool ShowTagValues { get; init; }
}
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveAuthenticationConfiguration(
string Mode,
string SqlitePath,
string PepperSecretName,
bool RunMigrationsOnStartup);
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveDashboardConfiguration(
bool Enabled,
string PathBase,
bool RequireAdminScope,
bool AllowAnonymousLocalhost,
int SnapshotIntervalMilliseconds,
int RecentFaultLimit,
int RecentSessionLimit,
bool ShowTagValues);
@@ -0,0 +1,5 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveEventConfiguration(
int QueueCapacity,
string BackpressurePolicy);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveGatewayConfiguration(
EffectiveAuthenticationConfiguration Authentication,
EffectiveWorkerConfiguration Worker,
EffectiveSessionConfiguration Sessions,
EffectiveEventConfiguration Events,
EffectiveDashboardConfiguration Dashboard,
EffectiveProtocolConfiguration Protocol);
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveProtocolConfiguration(uint WorkerProtocolVersion);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveSessionConfiguration(
int DefaultCommandTimeoutSeconds,
int MaxSessions,
bool AllowMultipleEventSubscribers);
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveWorkerConfiguration(
string ExecutablePath,
string? WorkingDirectory,
string RequiredArchitecture,
int StartupTimeoutSeconds,
int ShutdownTimeoutSeconds,
int HeartbeatIntervalSeconds,
int HeartbeatGraceSeconds,
int MaxMessageBytes);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Configuration;
public enum EventBackpressurePolicy
{
FailFast
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Configuration;
public sealed class EventOptions
{
public int QueueCapacity { get; init; } = 10_000;
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
}
@@ -0,0 +1,46 @@
using Microsoft.Extensions.Options;
namespace MxGateway.Server.Configuration;
public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> options) : IGatewayConfigurationProvider
{
public const string RedactedValue = "[redacted]";
public EffectiveGatewayConfiguration GetEffectiveConfiguration()
{
GatewayOptions value = options.Value;
return new EffectiveGatewayConfiguration(
Authentication: new EffectiveAuthenticationConfiguration(
Mode: value.Authentication.Mode.ToString(),
SqlitePath: value.Authentication.SqlitePath,
PepperSecretName: RedactedValue,
RunMigrationsOnStartup: value.Authentication.RunMigrationsOnStartup),
Worker: new EffectiveWorkerConfiguration(
ExecutablePath: value.Worker.ExecutablePath,
WorkingDirectory: value.Worker.WorkingDirectory,
RequiredArchitecture: value.Worker.RequiredArchitecture.ToString(),
StartupTimeoutSeconds: value.Worker.StartupTimeoutSeconds,
ShutdownTimeoutSeconds: value.Worker.ShutdownTimeoutSeconds,
HeartbeatIntervalSeconds: value.Worker.HeartbeatIntervalSeconds,
HeartbeatGraceSeconds: value.Worker.HeartbeatGraceSeconds,
MaxMessageBytes: value.Worker.MaxMessageBytes),
Sessions: new EffectiveSessionConfiguration(
DefaultCommandTimeoutSeconds: value.Sessions.DefaultCommandTimeoutSeconds,
MaxSessions: value.Sessions.MaxSessions,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
Events: new EffectiveEventConfiguration(
QueueCapacity: value.Events.QueueCapacity,
BackpressurePolicy: value.Events.BackpressurePolicy.ToString()),
Dashboard: new EffectiveDashboardConfiguration(
Enabled: value.Dashboard.Enabled,
PathBase: value.Dashboard.PathBase,
RequireAdminScope: value.Dashboard.RequireAdminScope,
AllowAnonymousLocalhost: value.Dashboard.AllowAnonymousLocalhost,
SnapshotIntervalMilliseconds: value.Dashboard.SnapshotIntervalMilliseconds,
RecentFaultLimit: value.Dashboard.RecentFaultLimit,
RecentSessionLimit: value.Dashboard.RecentSessionLimit,
ShowTagValues: value.Dashboard.ShowTagValues),
Protocol: new EffectiveProtocolConfiguration(value.Protocol.WorkerProtocolVersion));
}
}
@@ -0,0 +1,19 @@
using Microsoft.Extensions.Options;
namespace MxGateway.Server.Configuration;
public static class GatewayConfigurationServiceCollectionExtensions
{
public static IServiceCollection AddGatewayConfiguration(this IServiceCollection services)
{
services
.AddOptions<GatewayOptions>()
.BindConfiguration(GatewayOptions.SectionName)
.ValidateOnStart();
services.AddSingleton<IValidateOptions<GatewayOptions>, GatewayOptionsValidator>();
services.AddSingleton<IGatewayConfigurationProvider, GatewayConfigurationProvider>();
return services;
}
}
@@ -0,0 +1,18 @@
namespace MxGateway.Server.Configuration;
public sealed class GatewayOptions
{
public const string SectionName = "MxGateway";
public AuthenticationOptions Authentication { get; init; } = new();
public WorkerOptions Worker { get; init; } = new();
public SessionOptions Sessions { get; init; } = new();
public EventOptions Events { get; init; } = new();
public DashboardOptions Dashboard { get; init; } = new();
public ProtocolOptions Protocol { get; init; } = new();
}
@@ -0,0 +1,210 @@
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
namespace MxGateway.Server.Configuration;
public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
{
private const int MinimumMaxMessageBytes = 1024;
private const int MaximumMaxMessageBytes = 256 * 1024 * 1024;
public ValidateOptionsResult Validate(string? name, GatewayOptions options)
{
List<string> failures = [];
ValidateAuthentication(options.Authentication, failures);
ValidateWorker(options.Worker, failures);
ValidateSessions(options.Sessions, failures);
ValidateEvents(options.Events, failures);
ValidateDashboard(options.Dashboard, failures);
ValidateProtocol(options.Protocol, failures);
return failures.Count == 0
? ValidateOptionsResult.Success
: ValidateOptionsResult.Fail(failures);
}
private static void ValidateAuthentication(AuthenticationOptions options, List<string> failures)
{
if (!Enum.IsDefined(options.Mode))
{
failures.Add("MxGateway:Authentication:Mode must be a supported authentication mode.");
return;
}
if (options.Mode == AuthenticationMode.ApiKey)
{
AddIfBlank(
options.SqlitePath,
"MxGateway:Authentication:SqlitePath is required when API-key authentication is enabled.",
failures);
AddIfInvalidPath(
options.SqlitePath,
"MxGateway:Authentication:SqlitePath must be a valid filesystem path.",
failures);
AddIfBlank(
options.PepperSecretName,
"MxGateway:Authentication:PepperSecretName is required when API-key authentication is enabled.",
failures);
}
}
private static void ValidateWorker(WorkerOptions options, List<string> failures)
{
AddIfBlank(options.ExecutablePath, "MxGateway:Worker:ExecutablePath is required.", failures);
AddIfInvalidPath(
options.ExecutablePath,
"MxGateway:Worker:ExecutablePath must be a valid filesystem path.",
failures);
if (!string.IsNullOrWhiteSpace(options.ExecutablePath)
&& !string.Equals(Path.GetExtension(options.ExecutablePath), ".exe", StringComparison.OrdinalIgnoreCase))
{
failures.Add("MxGateway:Worker:ExecutablePath must point to a .exe file.");
}
if (!string.IsNullOrWhiteSpace(options.WorkingDirectory))
{
AddIfInvalidPath(
options.WorkingDirectory,
"MxGateway:Worker:WorkingDirectory must be a valid filesystem path.",
failures);
}
if (!Enum.IsDefined(options.RequiredArchitecture))
{
failures.Add("MxGateway:Worker:RequiredArchitecture must be a supported worker architecture.");
}
AddIfNotPositive(
options.StartupTimeoutSeconds,
"MxGateway:Worker:StartupTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.ShutdownTimeoutSeconds,
"MxGateway:Worker:ShutdownTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.HeartbeatIntervalSeconds,
"MxGateway:Worker:HeartbeatIntervalSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.HeartbeatGraceSeconds,
"MxGateway:Worker:HeartbeatGraceSeconds must be greater than zero.",
failures);
if (options.HeartbeatGraceSeconds < options.HeartbeatIntervalSeconds)
{
failures.Add(
"MxGateway:Worker:HeartbeatGraceSeconds must be greater than or equal to HeartbeatIntervalSeconds.");
}
if (options.MaxMessageBytes is < MinimumMaxMessageBytes or > MaximumMaxMessageBytes)
{
failures.Add(
$"MxGateway:Worker:MaxMessageBytes must be between {MinimumMaxMessageBytes} and {MaximumMaxMessageBytes}.");
}
}
private static void ValidateSessions(SessionOptions options, List<string> failures)
{
AddIfNotPositive(
options.DefaultCommandTimeoutSeconds,
"MxGateway:Sessions:DefaultCommandTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(options.MaxSessions, "MxGateway:Sessions:MaxSessions must be greater than zero.", failures);
}
private static void ValidateEvents(EventOptions options, List<string> failures)
{
AddIfNotPositive(options.QueueCapacity, "MxGateway:Events:QueueCapacity must be greater than zero.", failures);
if (!Enum.IsDefined(options.BackpressurePolicy))
{
failures.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
}
}
private static void ValidateDashboard(DashboardOptions options, List<string> failures)
{
if (options.Enabled)
{
AddIfBlank(options.PathBase, "MxGateway:Dashboard:PathBase is required when the dashboard is enabled.", failures);
if (!string.IsNullOrWhiteSpace(options.PathBase) && !options.PathBase.StartsWith('/'))
{
failures.Add("MxGateway:Dashboard:PathBase must start with '/'.");
}
}
AddIfNotPositive(
options.SnapshotIntervalMilliseconds,
"MxGateway:Dashboard:SnapshotIntervalMilliseconds must be greater than zero.",
failures);
AddIfNegative(
options.RecentFaultLimit,
"MxGateway:Dashboard:RecentFaultLimit must be greater than or equal to zero.",
failures);
AddIfNegative(
options.RecentSessionLimit,
"MxGateway:Dashboard:RecentSessionLimit must be greater than or equal to zero.",
failures);
}
private static void ValidateProtocol(ProtocolOptions options, List<string> failures)
{
if (options.WorkerProtocolVersion != GatewayContractInfo.WorkerProtocolVersion)
{
failures.Add(
$"MxGateway:Protocol:WorkerProtocolVersion must be {GatewayContractInfo.WorkerProtocolVersion}.");
}
}
private static void AddIfBlank(string? value, string message, List<string> failures)
{
if (string.IsNullOrWhiteSpace(value))
{
failures.Add(message);
}
}
private static void AddIfNotPositive(int value, string message, List<string> failures)
{
if (value <= 0)
{
failures.Add(message);
}
}
private static void AddIfNegative(int value, string message, List<string> failures)
{
if (value < 0)
{
failures.Add(message);
}
}
private static void AddIfInvalidPath(string? value, string message, List<string> failures)
{
if (string.IsNullOrWhiteSpace(value))
{
return;
}
try
{
_ = Path.GetFullPath(value);
}
catch (ArgumentException)
{
failures.Add(message);
}
catch (NotSupportedException)
{
failures.Add(message);
}
catch (PathTooLongException)
{
failures.Add(message);
}
}
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Configuration;
public interface IGatewayConfigurationProvider
{
EffectiveGatewayConfiguration GetEffectiveConfiguration();
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts;
namespace MxGateway.Server.Configuration;
public sealed class ProtocolOptions
{
public uint WorkerProtocolVersion { get; init; } = GatewayContractInfo.WorkerProtocolVersion;
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Configuration;
public sealed class SessionOptions
{
public int DefaultCommandTimeoutSeconds { get; init; } = 30;
public int MaxSessions { get; init; } = 64;
public bool AllowMultipleEventSubscribers { get; init; }
}
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Configuration;
public enum WorkerArchitecture
{
X86,
X64
}
@@ -0,0 +1,21 @@
namespace MxGateway.Server.Configuration;
public sealed class WorkerOptions
{
public string ExecutablePath { get; init; } =
@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe";
public string? WorkingDirectory { get; init; }
public WorkerArchitecture RequiredArchitecture { get; init; } = WorkerArchitecture.X86;
public int StartupTimeoutSeconds { get; init; } = 30;
public int ShutdownTimeoutSeconds { get; init; } = 10;
public int HeartbeatIntervalSeconds { get; init; } = 5;
public int HeartbeatGraceSeconds { get; init; } = 15;
public int MaxMessageBytes { get; init; } = 16 * 1024 * 1024;
}
@@ -0,0 +1,78 @@
namespace MxGateway.Server.Diagnostics;
public static class GatewayLogRedactor
{
public const string RedactedValue = "[redacted]";
private static readonly HashSet<string> SensitiveCommandMethods = new(StringComparer.OrdinalIgnoreCase)
{
"AuthenticateUser",
"WriteSecured",
"WriteSecured2"
};
public static bool IsCredentialBearingCommand(string? commandMethod)
{
return commandMethod is not null
&& SensitiveCommandMethods.Contains(commandMethod);
}
public static string? RedactApiKey(string? authorizationHeader)
{
if (string.IsNullOrWhiteSpace(authorizationHeader))
{
return authorizationHeader;
}
const string bearerPrefix = "Bearer ";
if (!authorizationHeader.StartsWith(bearerPrefix, StringComparison.OrdinalIgnoreCase))
{
return RedactedValue;
}
string token = authorizationHeader[bearerPrefix.Length..].Trim();
if (!token.StartsWith("mxgw_", StringComparison.OrdinalIgnoreCase))
{
return $"{bearerPrefix}{RedactedValue}";
}
string[] tokenParts = token.Split('_', 3, StringSplitOptions.RemoveEmptyEntries);
if (tokenParts.Length < 2)
{
return $"{bearerPrefix}mxgw_{RedactedValue}";
}
return $"{bearerPrefix}mxgw_{tokenParts[1]}_{RedactedValue}";
}
public static string? RedactClientIdentity(string? clientIdentity)
{
if (string.IsNullOrWhiteSpace(clientIdentity))
{
return clientIdentity;
}
return clientIdentity.Contains("mxgw_", StringComparison.OrdinalIgnoreCase)
? RedactApiKey(clientIdentity)
: clientIdentity;
}
public static object? RedactCommandValue(
string? commandMethod,
object? value,
bool valueLoggingEnabled = false)
{
if (value is null)
{
return null;
}
if (!valueLoggingEnabled || IsCredentialBearingCommand(commandMethod))
{
return RedactedValue;
}
return value;
}
}
@@ -0,0 +1,33 @@
namespace MxGateway.Server.Diagnostics;
public sealed record GatewayLogScope(
string? SessionId = null,
int? WorkerProcessId = null,
ulong? CorrelationId = null,
string? CommandMethod = null,
string? ClientIdentity = null)
{
public IReadOnlyDictionary<string, object?> ToDictionary()
{
Dictionary<string, object?> values = [];
AddIfPresent(values, "SessionId", SessionId);
AddIfPresent(values, "WorkerProcessId", WorkerProcessId);
AddIfPresent(values, "CorrelationId", CorrelationId);
AddIfPresent(values, "CommandMethod", CommandMethod);
AddIfPresent(values, "ClientIdentity", GatewayLogRedactor.RedactClientIdentity(ClientIdentity));
return values;
}
private static void AddIfPresent(
Dictionary<string, object?> values,
string key,
object? value)
{
if (value is not null)
{
values[key] = value;
}
}
}
@@ -0,0 +1,16 @@
using Microsoft.Extensions.Logging;
namespace MxGateway.Server.Diagnostics;
public static class GatewayLoggerExtensions
{
public static IDisposable? BeginGatewayScope(
this ILogger logger,
GatewayLogScope scope)
{
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(scope);
return logger.BeginScope(scope.ToDictionary());
}
}
@@ -0,0 +1,57 @@
using Microsoft.Extensions.Primitives;
namespace MxGateway.Server.Diagnostics;
public static class GatewayRequestLoggingMiddlewareExtensions
{
public const string SessionIdHeaderName = "x-session-id";
public const string WorkerProcessIdHeaderName = "x-worker-process-id";
public const string CorrelationIdHeaderName = "x-correlation-id";
public const string CommandMethodHeaderName = "x-command-method";
public static IApplicationBuilder UseGatewayRequestLoggingScope(this IApplicationBuilder app)
{
ArgumentNullException.ThrowIfNull(app);
return app.Use(async (context, next) =>
{
ILogger logger = context.RequestServices
.GetRequiredService<ILoggerFactory>()
.CreateLogger("MxGateway.Request");
using IDisposable? scope = logger.BeginGatewayScope(new GatewayLogScope(
SessionId: ReadHeader(context, SessionIdHeaderName),
WorkerProcessId: ReadInt32Header(context, WorkerProcessIdHeaderName),
CorrelationId: ReadUInt64Header(context, CorrelationIdHeaderName),
CommandMethod: ReadHeader(context, CommandMethodHeaderName),
ClientIdentity: ReadHeader(context, "authorization")));
await next(context);
});
}
private static string? ReadHeader(HttpContext context, string headerName)
{
return context.Request.Headers.TryGetValue(headerName, out StringValues values)
? values.ToString()
: null;
}
private static int? ReadInt32Header(HttpContext context, string headerName)
{
string? value = ReadHeader(context, headerName);
return int.TryParse(value, out int parsedValue)
? parsedValue
: null;
}
private static ulong? ReadUInt64Header(HttpContext context, string headerName)
{
string? value = ReadHeader(context, headerName);
return ulong.TryParse(value, out ulong parsedValue)
? parsedValue
: null;
}
}
@@ -1,4 +1,11 @@
using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Diagnostics;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server;
@@ -9,6 +16,7 @@ public static class GatewayApplication
WebApplicationBuilder builder = CreateBuilder(args);
WebApplication app = builder.Build();
app.UseGatewayRequestLoggingScope();
app.MapGatewayEndpoints();
return app;
@@ -18,7 +26,13 @@ public static class GatewayApplication
{
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddGatewayConfiguration();
builder.Services.AddSqliteAuthStore();
builder.Services.AddGatewayGrpcAuthorization();
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
builder.Services.AddWorkerProcessLauncher();
builder.Services.AddGatewaySessions();
return builder;
}
@@ -0,0 +1,306 @@
using System.Diagnostics.Metrics;
namespace MxGateway.Server.Metrics;
public sealed class GatewayMetrics : IDisposable
{
public const string MeterName = "MxGateway.Server";
private readonly object _syncRoot = new();
private readonly Meter _meter;
private readonly Counter<long> _sessionsOpenedCounter;
private readonly Counter<long> _sessionsClosedCounter;
private readonly Counter<long> _commandsStartedCounter;
private readonly Counter<long> _commandsSucceededCounter;
private readonly Counter<long> _commandsFailedCounter;
private readonly Counter<long> _eventsReceivedCounter;
private readonly Counter<long> _queueOverflowsCounter;
private readonly Counter<long> _faultsCounter;
private readonly Counter<long> _workerKillsCounter;
private readonly Counter<long> _workerExitsCounter;
private readonly Counter<long> _heartbeatFailuresCounter;
private readonly Counter<long> _streamDisconnectsCounter;
private readonly Histogram<double> _workerStartupLatencyHistogram;
private readonly Histogram<double> _commandLatencyHistogram;
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
private int _openSessions;
private int _workersRunning;
private int _eventQueueDepth;
private long _sessionsOpened;
private long _sessionsClosed;
private long _commandsStarted;
private long _commandsSucceeded;
private long _commandsFailed;
private long _eventsReceived;
private long _queueOverflows;
private long _faults;
private long _workerKills;
private long _workerExits;
private long _heartbeatFailures;
private long _streamDisconnects;
private bool _disposed;
public GatewayMetrics()
{
_meter = new Meter(MeterName, typeof(GatewayMetrics).Assembly.GetName().Version?.ToString());
_sessionsOpenedCounter = _meter.CreateCounter<long>("mxgateway.sessions.opened");
_sessionsClosedCounter = _meter.CreateCounter<long>("mxgateway.sessions.closed");
_commandsStartedCounter = _meter.CreateCounter<long>("mxgateway.commands.started");
_commandsSucceededCounter = _meter.CreateCounter<long>("mxgateway.commands.succeeded");
_commandsFailedCounter = _meter.CreateCounter<long>("mxgateway.commands.failed");
_eventsReceivedCounter = _meter.CreateCounter<long>("mxgateway.events.received");
_queueOverflowsCounter = _meter.CreateCounter<long>("mxgateway.queues.overflows");
_faultsCounter = _meter.CreateCounter<long>("mxgateway.faults");
_workerKillsCounter = _meter.CreateCounter<long>("mxgateway.workers.killed");
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
_meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions);
_meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning);
_meter.CreateObservableGauge("mxgateway.events.queue.depth", GetEventQueueDepth);
}
public void SessionOpened()
{
lock (_syncRoot)
{
_openSessions++;
_sessionsOpened++;
}
_sessionsOpenedCounter.Add(1);
}
public void SessionClosed()
{
lock (_syncRoot)
{
if (_openSessions > 0)
{
_openSessions--;
}
_sessionsClosed++;
}
_sessionsClosedCounter.Add(1);
}
public void WorkerStarted(TimeSpan startupDuration)
{
lock (_syncRoot)
{
_workersRunning++;
}
_workerStartupLatencyHistogram.Record(startupDuration.TotalMilliseconds);
}
public void WorkerStopped(string reason)
{
lock (_syncRoot)
{
if (_workersRunning > 0)
{
_workersRunning--;
}
_workerExits++;
}
_workerExitsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public void WorkerKilled(string reason)
{
lock (_syncRoot)
{
_workerKills++;
}
_workerKillsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public void CommandStarted(string method)
{
lock (_syncRoot)
{
_commandsStarted++;
}
_commandsStartedCounter.Add(1, new KeyValuePair<string, object?>("method", method));
}
public void CommandSucceeded(string method, TimeSpan duration)
{
lock (_syncRoot)
{
_commandsSucceeded++;
}
KeyValuePair<string, object?> methodTag = new("method", method);
_commandsSucceededCounter.Add(1, methodTag);
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag);
}
public void CommandFailed(string method, string category, TimeSpan duration)
{
lock (_syncRoot)
{
_commandsFailed++;
Increment(_commandFailuresByMethod, method);
}
KeyValuePair<string, object?> methodTag = new("method", method);
KeyValuePair<string, object?> categoryTag = new("category", category);
_commandsFailedCounter.Add(1, methodTag, categoryTag);
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag, categoryTag);
}
public void EventReceived(string sessionId, string family)
{
lock (_syncRoot)
{
_eventsReceived++;
Increment(_eventsByFamily, family);
}
_eventsReceivedCounter.Add(
1,
new KeyValuePair<string, object?>("session_id", sessionId),
new KeyValuePair<string, object?>("family", family));
}
public void RecordEventStreamSend(string family, TimeSpan duration)
{
_eventStreamSendLatencyHistogram.Record(
duration.TotalMilliseconds,
new KeyValuePair<string, object?>("family", family));
}
public void SetEventQueueDepth(int depth)
{
if (depth < 0)
{
throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative.");
}
lock (_syncRoot)
{
_eventQueueDepth = depth;
}
}
public void QueueOverflow(string queueName)
{
lock (_syncRoot)
{
_queueOverflows++;
}
_queueOverflowsCounter.Add(1, new KeyValuePair<string, object?>("queue", queueName));
}
public void Fault(string category)
{
lock (_syncRoot)
{
_faults++;
}
_faultsCounter.Add(1, new KeyValuePair<string, object?>("category", category));
}
public void HeartbeatFailed(string sessionId)
{
lock (_syncRoot)
{
_heartbeatFailures++;
}
_heartbeatFailuresCounter.Add(1, new KeyValuePair<string, object?>("session_id", sessionId));
}
public void StreamDisconnected(string reason)
{
lock (_syncRoot)
{
_streamDisconnects++;
}
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public GatewayMetricsSnapshot GetSnapshot()
{
lock (_syncRoot)
{
return new GatewayMetricsSnapshot(
OpenSessions: _openSessions,
WorkersRunning: _workersRunning,
EventQueueDepth: _eventQueueDepth,
SessionsOpened: _sessionsOpened,
SessionsClosed: _sessionsClosed,
CommandsStarted: _commandsStarted,
CommandsSucceeded: _commandsSucceeded,
CommandsFailed: _commandsFailed,
EventsReceived: _eventsReceived,
QueueOverflows: _queueOverflows,
Faults: _faults,
WorkerKills: _workerKills,
WorkerExits: _workerExits,
HeartbeatFailures: _heartbeatFailures,
StreamDisconnects: _streamDisconnects,
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase));
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_meter.Dispose();
_disposed = true;
}
private int GetOpenSessions()
{
lock (_syncRoot)
{
return _openSessions;
}
}
private int GetWorkersRunning()
{
lock (_syncRoot)
{
return _workersRunning;
}
}
private int GetEventQueueDepth()
{
lock (_syncRoot)
{
return _eventQueueDepth;
}
}
private static void Increment(Dictionary<string, long> values, string key)
{
values.TryGetValue(key, out long currentValue);
values[key] = currentValue + 1;
}
}
@@ -0,0 +1,20 @@
namespace MxGateway.Server.Metrics;
public sealed record GatewayMetricsSnapshot(
int OpenSessions,
int WorkersRunning,
int EventQueueDepth,
long SessionsOpened,
long SessionsClosed,
long CommandsStarted,
long CommandsSucceeded,
long CommandsFailed,
long EventsReceived,
long QueueOverflows,
long Faults,
long WorkerKills,
long WorkerExits,
long HeartbeatFailures,
long StreamDisconnects,
IReadOnlyDictionary<string, long> CommandFailuresByMethod,
IReadOnlyDictionary<string, long> EventsByFamily);
@@ -4,6 +4,11 @@
<TargetFramework>net10.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.76.0" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>
+37 -1
View File
@@ -1,7 +1,43 @@
using MxGateway.Server;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
var app = GatewayApplication.Build(args);
ApiKeyAdminParseResult apiKeyAdminCommand = ApiKeyAdminCommandLineParser.Parse(args);
if (apiKeyAdminCommand.IsApiKeyCommand)
{
if (apiKeyAdminCommand.Command is null)
{
await Console.Error.WriteLineAsync(apiKeyAdminCommand.Error);
return 2;
}
WebApplicationBuilder builder = GatewayApplication.CreateBuilder([]);
ApplyApiKeyAdminOverrides(builder.Configuration, apiKeyAdminCommand.Command);
await using WebApplication cliApp = builder.Build();
await using AsyncServiceScope scope = cliApp.Services.CreateAsyncScope();
ApiKeyAdminCliRunner runner = scope.ServiceProvider.GetRequiredService<ApiKeyAdminCliRunner>();
return await runner.RunAsync(apiKeyAdminCommand.Command, Console.Out, CancellationToken.None);
}
WebApplication app = GatewayApplication.Build(args);
app.Run();
return 0;
static void ApplyApiKeyAdminOverrides(IConfiguration configuration, ApiKeyAdminCommand command)
{
if (!string.IsNullOrWhiteSpace(command.SqlitePath))
{
configuration[$"{GatewayOptions.SectionName}:Authentication:SqlitePath"] = command.SqlitePath;
}
if (!string.IsNullOrWhiteSpace(command.Pepper))
{
configuration["MxGateway:ApiKeyPepper"] = command.Pepper;
}
}
public partial class Program;
@@ -0,0 +1,180 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyAdminCliRunner(
IAuthStoreMigrator migrator,
IApiKeyAdminStore adminStore,
IApiKeyAuditStore auditStore,
IApiKeySecretHasher hasher)
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
WriteIndented = true
};
public async Task<int> RunAsync(
ApiKeyAdminCommand command,
TextWriter output,
CancellationToken cancellationToken)
{
ApiKeyAdminOutput result = command.Kind switch
{
ApiKeyAdminCommandKind.InitDb => await InitDbAsync(cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.CreateKey => await CreateKeyAsync(command, cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.ListKeys => await ListKeysAsync(cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.RevokeKey => await RevokeKeyAsync(command, cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.RotateKey => await RotateKeyAsync(command, cancellationToken).ConfigureAwait(false),
_ => throw new InvalidOperationException($"Unsupported API key command '{command.Kind}'.")
};
await WriteOutputAsync(command, result, output).ConfigureAwait(false);
return 0;
}
private async Task<ApiKeyAdminOutput> InitDbAsync(CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
await AppendAuditAsync(null, "init-db", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput("init-db", "initialized", null, []);
}
private async Task<ApiKeyAdminOutput> CreateKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
string secret = ApiKeySecretGenerator.Generate();
string apiKey = FormatApiKey(keyId, secret);
await adminStore.CreateAsync(
new ApiKeyCreateRequest(
KeyId: keyId,
KeyPrefix: $"mxgw_{keyId}",
SecretHash: hasher.HashSecret(secret),
DisplayName: Required(command.DisplayName),
Scopes: command.Scopes,
CreatedUtc: DateTimeOffset.UtcNow),
cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "create-key", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput("create-key", "created", apiKey, []);
}
private async Task<ApiKeyAdminOutput> ListKeysAsync(CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
IReadOnlyList<ApiKeyRecord> keys = await adminStore.ListAsync(cancellationToken).ConfigureAwait(false);
await AppendAuditAsync(null, "list-keys", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput(
"list-keys",
"ok",
null,
keys.Select(ToListedKey).ToArray());
}
private async Task<ApiKeyAdminOutput> RevokeKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
bool revoked = await adminStore.RevokeAsync(keyId, DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "revoke-key", revoked ? "revoked" : "not-found-or-already-revoked", cancellationToken)
.ConfigureAwait(false);
return new ApiKeyAdminOutput("revoke-key", revoked ? "revoked" : "not-found-or-already-revoked", null, []);
}
private async Task<ApiKeyAdminOutput> RotateKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
string secret = ApiKeySecretGenerator.Generate();
string apiKey = FormatApiKey(keyId, secret);
bool rotated = await adminStore.RotateAsync(keyId, hasher.HashSecret(secret), DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "rotate-key", rotated ? "rotated" : "not-found", cancellationToken)
.ConfigureAwait(false);
return new ApiKeyAdminOutput("rotate-key", rotated ? "rotated" : "not-found", rotated ? apiKey : null, []);
}
private static async Task WriteOutputAsync(
ApiKeyAdminCommand command,
ApiKeyAdminOutput result,
TextWriter output)
{
if (command.Json)
{
await output.WriteLineAsync(JsonSerializer.Serialize(result, JsonOptions)).ConfigureAwait(false);
return;
}
await output.WriteLineAsync($"{result.Command}: {result.Status}").ConfigureAwait(false);
if (result.ApiKey is not null)
{
await output.WriteLineAsync($"API key: {result.ApiKey}").ConfigureAwait(false);
}
foreach (ApiKeyAdminListedKey key in result.Keys)
{
string revoked = key.RevokedUtc is null ? "active" : "revoked";
await output.WriteLineAsync($"{key.KeyId}\t{key.DisplayName}\t{revoked}\t{string.Join(',', key.Scopes)}")
.ConfigureAwait(false);
}
}
private async Task AppendAuditAsync(
string? keyId,
string eventType,
string? details,
CancellationToken cancellationToken)
{
await auditStore.AppendAsync(
new ApiKeyAuditEntry(
KeyId: keyId,
EventType: eventType,
RemoteAddress: null,
Details: details),
cancellationToken)
.ConfigureAwait(false);
}
private static ApiKeyAdminListedKey ToListedKey(ApiKeyRecord key)
{
return new ApiKeyAdminListedKey(
KeyId: key.KeyId,
KeyPrefix: key.KeyPrefix,
DisplayName: key.DisplayName,
Scopes: key.Scopes,
CreatedUtc: key.CreatedUtc,
LastUsedUtc: key.LastUsedUtc,
RevokedUtc: key.RevokedUtc);
}
private static string FormatApiKey(string keyId, string secret)
{
return $"mxgw_{keyId}_{secret}";
}
private static string Required(string? value)
{
return value ?? throw new InvalidOperationException("Required command value was not provided.");
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminCommand(
ApiKeyAdminCommandKind Kind,
bool Json,
string? SqlitePath,
string? Pepper,
string? KeyId,
string? DisplayName,
IReadOnlySet<string> Scopes);
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public enum ApiKeyAdminCommandKind
{
InitDb,
CreateKey,
ListKeys,
RevokeKey,
RotateKey
}
@@ -0,0 +1,159 @@
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyAdminCommandLineParser
{
public static ApiKeyAdminParseResult Parse(IReadOnlyList<string> args)
{
if (args.Count == 0 || !string.Equals(args[0], "apikey", StringComparison.OrdinalIgnoreCase))
{
return ApiKeyAdminParseResult.NotApiKeyCommand();
}
if (args.Count < 2)
{
return ApiKeyAdminParseResult.Fail("Missing apikey subcommand.");
}
if (!TryParseKind(args[1], out ApiKeyAdminCommandKind kind))
{
return ApiKeyAdminParseResult.Fail($"Unknown apikey subcommand '{args[1]}'.");
}
Dictionary<string, string?> options = new(StringComparer.OrdinalIgnoreCase);
bool json = false;
for (int index = 2; index < args.Count; index++)
{
string arg = args[index];
if (string.Equals(arg, "--json", StringComparison.OrdinalIgnoreCase))
{
json = true;
continue;
}
if (!arg.StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Unexpected argument '{arg}'.");
}
string name = arg[2..];
string? value;
int equalsIndex = name.IndexOf('=', StringComparison.Ordinal);
if (equalsIndex >= 0)
{
value = name[(equalsIndex + 1)..];
name = name[..equalsIndex];
}
else
{
if (index + 1 >= args.Count || args[index + 1].StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Option '--{name}' requires a value.");
}
value = args[++index];
}
options[name] = value;
}
string? keyId = GetOption(options, "key-id");
string? displayName = GetOption(options, "display-name");
IReadOnlySet<string> scopes = ParseScopes(GetOption(options, "scopes"));
string? validationError = Validate(kind, keyId, displayName);
if (validationError is not null)
{
return ApiKeyAdminParseResult.Fail(validationError);
}
return ApiKeyAdminParseResult.Success(new ApiKeyAdminCommand(
Kind: kind,
Json: json,
SqlitePath: GetOption(options, "sqlite-path"),
Pepper: GetOption(options, "pepper"),
KeyId: keyId,
DisplayName: displayName,
Scopes: scopes));
}
private static bool TryParseKind(string value, out ApiKeyAdminCommandKind kind)
{
switch (value.ToLowerInvariant())
{
case "init-db":
kind = ApiKeyAdminCommandKind.InitDb;
return true;
case "create-key":
kind = ApiKeyAdminCommandKind.CreateKey;
return true;
case "list-keys":
kind = ApiKeyAdminCommandKind.ListKeys;
return true;
case "revoke-key":
kind = ApiKeyAdminCommandKind.RevokeKey;
return true;
case "rotate-key":
kind = ApiKeyAdminCommandKind.RotateKey;
return true;
default:
kind = default;
return false;
}
}
private static string? Validate(ApiKeyAdminCommandKind kind, string? keyId, string? displayName)
{
if (kind is ApiKeyAdminCommandKind.CreateKey or ApiKeyAdminCommandKind.RevokeKey or ApiKeyAdminCommandKind.RotateKey
&& string.IsNullOrWhiteSpace(keyId))
{
return $"Subcommand '{KindName(kind)}' requires --key-id.";
}
if (!string.IsNullOrWhiteSpace(keyId) && !IsValidKeyId(keyId))
{
return "API key id may contain only letters, numbers, periods, and hyphens.";
}
if (kind == ApiKeyAdminCommandKind.CreateKey && string.IsNullOrWhiteSpace(displayName))
{
return "Subcommand 'create-key' requires --display-name.";
}
return null;
}
private static string KindName(ApiKeyAdminCommandKind kind)
{
return kind switch
{
ApiKeyAdminCommandKind.InitDb => "init-db",
ApiKeyAdminCommandKind.CreateKey => "create-key",
ApiKeyAdminCommandKind.ListKeys => "list-keys",
ApiKeyAdminCommandKind.RevokeKey => "revoke-key",
ApiKeyAdminCommandKind.RotateKey => "rotate-key",
_ => kind.ToString()
};
}
private static bool IsValidKeyId(string keyId)
{
return keyId.All(character =>
char.IsAsciiLetterOrDigit(character)
|| character is '.' or '-');
}
private static string? GetOption(Dictionary<string, string?> options, string name)
{
return options.TryGetValue(name, out string? value) ? value : null;
}
private static IReadOnlySet<string> ParseScopes(string? scopes)
{
return new HashSet<string>(
(scopes ?? string.Empty)
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries),
StringComparer.Ordinal);
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminListedKey(
string KeyId,
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminOutput(
string Command,
string Status,
string? ApiKey,
IReadOnlyList<ApiKeyAdminListedKey> Keys);
@@ -0,0 +1,22 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminParseResult(
bool IsApiKeyCommand,
ApiKeyAdminCommand? Command,
string? Error)
{
public static ApiKeyAdminParseResult NotApiKeyCommand()
{
return new ApiKeyAdminParseResult(false, null, null);
}
public static ApiKeyAdminParseResult Success(ApiKeyAdminCommand command)
{
return new ApiKeyAdminParseResult(true, command, null);
}
public static ApiKeyAdminParseResult Fail(string error)
{
return new ApiKeyAdminParseResult(true, null, error);
}
}
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAuditEntry(
string? KeyId,
string EventType,
string? RemoteAddress,
string? Details);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAuditRecord(
long AuditId,
string? KeyId,
string EventType,
string? RemoteAddress,
DateTimeOffset CreatedUtc,
string? Details);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyCreateRequest(
string KeyId,
string KeyPrefix,
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc);
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyIdentity(
string KeyId,
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes);
@@ -0,0 +1,45 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyParser : IApiKeyParser
{
private const string BearerPrefix = "Bearer ";
private const string TokenPrefix = "mxgw_";
public bool TryParseAuthorizationHeader(string? authorizationHeader, out ParsedApiKey? apiKey)
{
apiKey = null;
if (string.IsNullOrWhiteSpace(authorizationHeader)
|| !authorizationHeader.StartsWith(BearerPrefix, StringComparison.OrdinalIgnoreCase))
{
return false;
}
string token = authorizationHeader[BearerPrefix.Length..].Trim();
if (!token.StartsWith(TokenPrefix, StringComparison.OrdinalIgnoreCase))
{
return false;
}
string keyPayload = token[TokenPrefix.Length..];
int separatorIndex = keyPayload.IndexOf('_', StringComparison.Ordinal);
if (separatorIndex <= 0 || separatorIndex == keyPayload.Length - 1)
{
return false;
}
string keyId = keyPayload[..separatorIndex];
string secret = keyPayload[(separatorIndex + 1)..];
if (string.IsNullOrWhiteSpace(keyId) || string.IsNullOrWhiteSpace(secret))
{
return false;
}
apiKey = new ParsedApiKey(keyId, secret);
return true;
}
}
@@ -0,0 +1,4 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyPepperUnavailableException(string pepperSecretName)
: InvalidOperationException($"API key pepper secret '{pepperSecretName}' is not configured.");
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyRecord(
string KeyId,
string KeyPrefix,
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,26 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyRecordReader
{
public static ApiKeyRecord Read(SqliteDataReader reader)
{
return new ApiKeyRecord(
KeyId: reader.GetString(0),
KeyPrefix: reader.GetString(1),
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
}
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
{
return reader.IsDBNull(ordinal)
? null
: DateTimeOffset.Parse(reader.GetString(ordinal), System.Globalization.CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,23 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyScopeSerializer
{
public static string Serialize(IReadOnlySet<string> scopes)
{
return JsonSerializer.Serialize(scopes.Order(StringComparer.Ordinal));
}
public static IReadOnlySet<string> Deserialize(string value)
{
if (string.IsNullOrWhiteSpace(value))
{
return new HashSet<string>(StringComparer.Ordinal);
}
string[]? scopes = JsonSerializer.Deserialize<string[]>(value);
return new HashSet<string>(scopes ?? [], StringComparer.Ordinal);
}
}
@@ -0,0 +1,17 @@
using System.Security.Cryptography;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeySecretGenerator
{
public static string Generate()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes)
.TrimEnd('=')
.Replace('+', '-')
.Replace('/', '_');
}
}
@@ -0,0 +1,35 @@
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeySecretHasher(
IConfiguration configuration,
IOptions<GatewayOptions> options) : IApiKeySecretHasher
{
public byte[] HashSecret(string secret)
{
string pepper = GetPepper();
byte[] pepperBytes = Encoding.UTF8.GetBytes(pepper);
byte[] secretBytes = Encoding.UTF8.GetBytes(secret);
using HMACSHA256 hmac = new(pepperBytes);
return hmac.ComputeHash(secretBytes);
}
private string GetPepper()
{
string pepperSecretName = options.Value.Authentication.PepperSecretName;
string? pepper = configuration[pepperSecretName];
if (string.IsNullOrWhiteSpace(pepper))
{
throw new ApiKeyPepperUnavailableException(pepperSecretName);
}
return pepper;
}
}
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Security.Authentication;
public enum ApiKeyVerificationFailure
{
None,
MissingOrMalformedCredentials,
PepperUnavailable,
KeyNotFound,
KeyRevoked,
SecretMismatch
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyVerificationResult(
bool Succeeded,
ApiKeyIdentity? Identity,
ApiKeyVerificationFailure Failure)
{
public static ApiKeyVerificationResult Success(ApiKeyIdentity identity)
{
return new ApiKeyVerificationResult(
Succeeded: true,
Identity: identity,
Failure: ApiKeyVerificationFailure.None);
}
public static ApiKeyVerificationResult Fail(ApiKeyVerificationFailure failure)
{
return new ApiKeyVerificationResult(
Succeeded: false,
Identity: null,
Failure: failure);
}
}
@@ -0,0 +1,57 @@
using System.Security.Cryptography;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyVerifier(
IApiKeyParser parser,
IApiKeySecretHasher hasher,
IApiKeyStore keyStore) : IApiKeyVerifier
{
public async Task<ApiKeyVerificationResult> VerifyAsync(
string? authorizationHeader,
CancellationToken cancellationToken)
{
if (!parser.TryParseAuthorizationHeader(authorizationHeader, out ParsedApiKey? parsedKey)
|| parsedKey is null)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.MissingOrMalformedCredentials);
}
ApiKeyRecord? storedKey = await keyStore.FindByKeyIdAsync(parsedKey.KeyId, cancellationToken)
.ConfigureAwait(false);
if (storedKey is null)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.KeyNotFound);
}
if (storedKey.RevokedUtc is not null)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.KeyRevoked);
}
byte[] presentedHash;
try
{
presentedHash = hasher.HashSecret(parsedKey.Secret);
}
catch (ApiKeyPepperUnavailableException)
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.PepperUnavailable);
}
if (!CryptographicOperations.FixedTimeEquals(presentedHash, storedKey.SecretHash))
{
return ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.SecretMismatch);
}
await keyStore.MarkKeyUsedAsync(storedKey.KeyId, DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
return ApiKeyVerificationResult.Success(new ApiKeyIdentity(
KeyId: storedKey.KeyId,
KeyPrefix: storedKey.KeyPrefix,
DisplayName: storedKey.DisplayName,
Scopes: storedKey.Scopes));
}
}
@@ -0,0 +1,27 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthSqliteConnectionFactory(IOptions<GatewayOptions> options)
{
public SqliteConnection CreateConnection()
{
string sqlitePath = options.Value.Authentication.SqlitePath;
string? directory = Path.GetDirectoryName(sqlitePath);
if (!string.IsNullOrWhiteSpace(directory))
{
Directory.CreateDirectory(directory);
}
SqliteConnectionStringBuilder builder = new()
{
DataSource = sqlitePath,
Mode = SqliteOpenMode.ReadWriteCreate
};
return new SqliteConnection(builder.ToString());
}
}
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthStoreMigrationException(string message) : InvalidOperationException(message);
@@ -0,0 +1,24 @@
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthStoreMigrationHostedService(
IOptions<GatewayOptions> options,
IAuthStoreMigrator migrator) : IHostedService
{
public async Task StartAsync(CancellationToken cancellationToken)
{
AuthenticationOptions authentication = options.Value.Authentication;
if (authentication.Mode == AuthenticationMode.ApiKey && authentication.RunMigrationsOnStartup)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
@@ -0,0 +1,20 @@
namespace MxGateway.Server.Security.Authentication;
public static class AuthStoreServiceCollectionExtensions
{
public static IServiceCollection AddSqliteAuthStore(this IServiceCollection services)
{
services.AddSingleton<IApiKeyParser, ApiKeyParser>();
services.AddSingleton<IApiKeySecretHasher, ApiKeySecretHasher>();
services.AddSingleton<IApiKeyVerifier, ApiKeyVerifier>();
services.AddSingleton<ApiKeyAdminCliRunner>();
services.AddSingleton<AuthSqliteConnectionFactory>();
services.AddSingleton<IAuthStoreMigrator, SqliteAuthStoreMigrator>();
services.AddSingleton<IApiKeyStore, SqliteApiKeyStore>();
services.AddSingleton<IApiKeyAdminStore, SqliteApiKeyAdminStore>();
services.AddSingleton<IApiKeyAuditStore, SqliteApiKeyAuditStore>();
services.AddHostedService<AuthStoreMigrationHostedService>();
return services;
}
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyAdminStore
{
Task CreateAsync(ApiKeyCreateRequest request, CancellationToken cancellationToken);
Task<IReadOnlyList<ApiKeyRecord>> ListAsync(CancellationToken cancellationToken);
Task<bool> RevokeAsync(string keyId, DateTimeOffset revokedUtc, CancellationToken cancellationToken);
Task<bool> RotateAsync(
string keyId,
byte[] secretHash,
DateTimeOffset rotatedUtc,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyAuditStore
{
Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken);
Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyParser
{
bool TryParseAuthorizationHeader(string? authorizationHeader, out ParsedApiKey? apiKey);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeySecretHasher
{
byte[] HashSecret(string secret);
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyStore
{
Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken);
Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken);
Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyVerifier
{
Task<ApiKeyVerificationResult> VerifyAsync(
string? authorizationHeader,
CancellationToken cancellationToken);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IAuthStoreMigrator
{
Task MigrateAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ParsedApiKey(string KeyId, string Secret);
@@ -0,0 +1,116 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAdminStore
{
public async Task CreateAsync(ApiKeyCreateRequest request, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_keys (
key_id,
key_prefix,
secret_hash,
display_name,
scopes,
created_utc,
last_used_utc,
revoked_utc)
VALUES (
$key_id,
$key_prefix,
$secret_hash,
$display_name,
$scopes,
$created_utc,
NULL,
NULL);
""";
AddCreateParameters(command, request);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ApiKeyRecord>> ListAsync(CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
ORDER BY key_id;
""";
List<ApiKeyRecord> records = [];
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(ApiKeyRecordReader.Read(reader));
}
return records;
}
public async Task<bool> RevokeAsync(string keyId, DateTimeOffset revokedUtc, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET revoked_utc = $revoked_utc
WHERE key_id = $key_id AND revoked_utc IS NULL;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.AddWithValue("$revoked_utc", revokedUtc.ToString("O"));
int rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
public async Task<bool> RotateAsync(
string keyId,
byte[] secretHash,
DateTimeOffset rotatedUtc,
CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET secret_hash = $secret_hash,
last_used_utc = NULL,
revoked_utc = NULL
WHERE key_id = $key_id;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = secretHash;
int rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
private static void AddCreateParameters(SqliteCommand command, ApiKeyCreateRequest request)
{
command.Parameters.AddWithValue("$key_id", request.KeyId);
command.Parameters.AddWithValue("$key_prefix", request.KeyPrefix);
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = request.SecretHash;
command.Parameters.AddWithValue("$display_name", request.DisplayName);
command.Parameters.AddWithValue("$scopes", ApiKeyScopeSerializer.Serialize(request.Scopes));
command.Parameters.AddWithValue("$created_utc", request.CreatedUtc.ToString("O"));
}
}
@@ -0,0 +1,65 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyAuditStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAuditStore
{
public async Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_key_audit (key_id, event_type, remote_address, created_utc, details)
VALUES ($key_id, $event_type, $remote_address, $created_utc, $details);
""";
command.Parameters.AddWithValue("$key_id", (object?)entry.KeyId ?? DBNull.Value);
command.Parameters.AddWithValue("$event_type", entry.EventType);
command.Parameters.AddWithValue("$remote_address", (object?)entry.RemoteAddress ?? DBNull.Value);
command.Parameters.AddWithValue("$created_utc", DateTimeOffset.UtcNow.ToString("O"));
command.Parameters.AddWithValue("$details", (object?)entry.Details ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken)
{
if (count <= 0)
{
return [];
}
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT audit_id, key_id, event_type, remote_address, created_utc, details
FROM api_key_audit
ORDER BY audit_id DESC
LIMIT $count;
""";
command.Parameters.AddWithValue("$count", count);
List<ApiKeyAuditRecord> records = [];
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(new ApiKeyAuditRecord(
AuditId: reader.GetInt64(0),
KeyId: reader.IsDBNull(1) ? null : reader.GetString(1),
EventType: reader.GetString(2),
RemoteAddress: reader.IsDBNull(3) ? null : reader.GetString(3),
CreatedUtc: DateTimeOffset.Parse(
reader.GetString(4),
System.Globalization.CultureInfo.InvariantCulture),
Details: reader.IsDBNull(5) ? null : reader.GetString(5)));
}
return records;
}
}
@@ -0,0 +1,66 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyStore
{
public Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken)
{
return FindByKeyIdAsync(keyId, requireActive: false, cancellationToken);
}
public Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken)
{
return FindByKeyIdAsync(keyId, requireActive: true, cancellationToken);
}
public async Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET last_used_utc = $last_used_utc
WHERE key_id = $key_id AND revoked_utc IS NULL;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.AddWithValue("$last_used_utc", usedUtc.ToString("O"));
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private async Task<ApiKeyRecord?> FindByKeyIdAsync(
string keyId,
bool requireActive,
CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = requireActive
? """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id AND revoked_utc IS NULL;
"""
: """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id;
""";
command.Parameters.AddWithValue("$key_id", keyId);
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
return ApiKeyRecordReader.Read(reader);
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Security.Authentication;
public static class SqliteAuthSchema
{
public const int CurrentVersion = 1;
public const string SchemaVersionTable = "schema_version";
public const string ApiKeysTable = "api_keys";
public const string ApiKeyAuditTable = "api_key_audit";
}
@@ -0,0 +1,135 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connectionFactory) : IAuthStoreMigrator
{
public async Task MigrateAsync(CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteTransaction transaction =
(SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
int existingVersion = await ReadExistingSchemaVersionAsync(connection, transaction, cancellationToken)
.ConfigureAwait(false);
if (existingVersion > SqliteAuthSchema.CurrentVersion)
{
throw new AuthStoreMigrationException(
$"Auth database schema version {existingVersion} is newer than supported version {SqliteAuthSchema.CurrentVersion}.");
}
await ApplyVersionOneAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task<int> ReadExistingSchemaVersionAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await using SqliteCommand tableExistsCommand = connection.CreateCommand();
tableExistsCommand.Transaction = transaction;
tableExistsCommand.CommandText = """
SELECT COUNT(*)
FROM sqlite_master
WHERE type = 'table' AND name = $table_name;
""";
tableExistsCommand.Parameters.AddWithValue("$table_name", SqliteAuthSchema.SchemaVersionTable);
long tableCount = (long)(await tableExistsCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) ?? 0L);
if (tableCount == 0)
{
return 0;
}
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
SELECT version
FROM schema_version
WHERE id = 1;
""";
object? version = await versionCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return version is null || version == DBNull.Value
? 0
: Convert.ToInt32(version, System.Globalization.CultureInfo.InvariantCulture);
}
private static async Task ApplyVersionOneAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await ExecuteNonQueryAsync(
connection,
transaction,
"""
CREATE TABLE IF NOT EXISTS schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
applied_utc TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS api_keys (
key_id TEXT PRIMARY KEY,
key_prefix TEXT NOT NULL,
secret_hash BLOB NOT NULL,
display_name TEXT NOT NULL,
scopes TEXT NOT NULL,
created_utc TEXT NOT NULL,
last_used_utc TEXT NULL,
revoked_utc TEXT NULL
);
CREATE TABLE IF NOT EXISTS api_key_audit (
audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT NULL,
event_type TEXT NOT NULL,
remote_address TEXT NULL,
created_utc TEXT NOT NULL,
details TEXT NULL
);
CREATE INDEX IF NOT EXISTS ix_api_keys_revoked_utc
ON api_keys (revoked_utc);
CREATE INDEX IF NOT EXISTS ix_api_key_audit_key_id_created_utc
ON api_key_audit (key_id, created_utc);
""",
cancellationToken).ConfigureAwait(false);
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
INSERT INTO schema_version (id, version, applied_utc)
VALUES (1, $version, $applied_utc)
ON CONFLICT(id) DO UPDATE SET
version = excluded.version,
applied_utc = excluded.applied_utc;
""";
versionCommand.Parameters.AddWithValue("$version", SqliteAuthSchema.CurrentVersion);
versionCommand.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
await versionCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task ExecuteNonQueryAsync(
SqliteConnection connection,
SqliteTransaction transaction,
string commandText,
CancellationToken cancellationToken)
{
await using SqliteCommand command = connection.CreateCommand();
command.Transaction = transaction;
command.CommandText = commandText;
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}
@@ -0,0 +1,74 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayGrpcAuthorizationInterceptor(
IApiKeyVerifier apiKeyVerifier,
GatewayGrpcScopeResolver scopeResolver,
IGatewayRequestIdentityAccessor identityAccessor,
IOptions<GatewayOptions> options) : Interceptor
{
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
ApiKeyIdentity? identity = await AuthenticateAndAuthorizeAsync(request, context).ConfigureAwait(false);
IDisposable? identityScope = identity is null ? null : identityAccessor.Push(identity);
using (identityScope)
{
return await continuation(request, context).ConfigureAwait(false);
}
}
public override async Task ServerStreamingServerHandler<TRequest, TResponse>(
TRequest request,
IServerStreamWriter<TResponse> responseStream,
ServerCallContext context,
ServerStreamingServerMethod<TRequest, TResponse> continuation)
{
ApiKeyIdentity? identity = await AuthenticateAndAuthorizeAsync(request, context).ConfigureAwait(false);
IDisposable? identityScope = identity is null ? null : identityAccessor.Push(identity);
using (identityScope)
{
await continuation(request, responseStream, context).ConfigureAwait(false);
}
}
private async Task<ApiKeyIdentity?> AuthenticateAndAuthorizeAsync<TRequest>(
TRequest request,
ServerCallContext context)
where TRequest : class
{
if (options.Value.Authentication.Mode == AuthenticationMode.Disabled)
{
return null;
}
string? authorizationHeader = context.RequestHeaders.GetValue("authorization");
ApiKeyVerificationResult verificationResult = await apiKeyVerifier
.VerifyAsync(authorizationHeader, context.CancellationToken)
.ConfigureAwait(false);
if (!verificationResult.Succeeded || verificationResult.Identity is null)
{
throw new RpcException(new Status(
StatusCode.Unauthenticated,
"Missing or invalid API key."));
}
string requiredScope = scopeResolver.ResolveRequiredScope(request);
if (!verificationResult.Identity.Scopes.Contains(requiredScope))
{
throw new RpcException(new Status(
StatusCode.PermissionDenied,
$"API key is missing required scope '{requiredScope}'."));
}
return verificationResult.Identity;
}
}
@@ -0,0 +1,40 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayGrpcScopeResolver
{
public string ResolveRequiredScope(object request)
{
return request switch
{
OpenSessionRequest => GatewayScopes.SessionOpen,
CloseSessionRequest => GatewayScopes.SessionClose,
StreamEventsRequest => GatewayScopes.EventsRead,
MxCommandRequest commandRequest => ResolveCommandScope(commandRequest.Command?.Kind ?? MxCommandKind.Unspecified),
_ => GatewayScopes.Admin
};
}
private static string ResolveCommandScope(MxCommandKind kind)
{
return kind switch
{
MxCommandKind.Write or
MxCommandKind.Write2 => GatewayScopes.InvokeWrite,
MxCommandKind.WriteSecured or
MxCommandKind.WriteSecured2 or
MxCommandKind.AuthenticateUser => GatewayScopes.InvokeSecure,
MxCommandKind.ArchestraUserToId or
MxCommandKind.GetSessionState or
MxCommandKind.GetWorkerInfo => GatewayScopes.MetadataRead,
MxCommandKind.DrainEvents => GatewayScopes.EventsRead,
MxCommandKind.ShutdownWorker => GatewayScopes.Admin,
_ => GatewayScopes.InvokeRead
};
}
}
@@ -0,0 +1,38 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayRequestIdentityAccessor : IGatewayRequestIdentityAccessor
{
private readonly AsyncLocal<ApiKeyIdentity?> currentIdentity = new();
public ApiKeyIdentity? Current => currentIdentity.Value;
public IDisposable Push(ApiKeyIdentity identity)
{
ArgumentNullException.ThrowIfNull(identity);
ApiKeyIdentity? previousIdentity = currentIdentity.Value;
currentIdentity.Value = identity;
return new IdentityScope(this, previousIdentity);
}
private sealed class IdentityScope(
GatewayRequestIdentityAccessor accessor,
ApiKeyIdentity? previousIdentity) : IDisposable
{
private bool disposed;
public void Dispose()
{
if (disposed)
{
return;
}
accessor.currentIdentity.Value = previousIdentity;
disposed = true;
}
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Security.Authorization;
public static class GatewayScopes
{
public const string SessionOpen = "session:open";
public const string SessionClose = "session:close";
public const string InvokeRead = "invoke:read";
public const string InvokeWrite = "invoke:write";
public const string InvokeSecure = "invoke:secure";
public const string EventsRead = "events:read";
public const string MetadataRead = "metadata:read";
public const string Admin = "admin";
}
@@ -0,0 +1,16 @@
using Grpc.Core.Interceptors;
namespace MxGateway.Server.Security.Authorization;
public static class GrpcAuthorizationServiceCollectionExtensions
{
public static IServiceCollection AddGatewayGrpcAuthorization(this IServiceCollection services)
{
services.AddSingleton<GatewayGrpcScopeResolver>();
services.AddSingleton<IGatewayRequestIdentityAccessor, GatewayRequestIdentityAccessor>();
services.AddSingleton<GatewayGrpcAuthorizationInterceptor>();
services.AddGrpc(options => options.Interceptors.Add<GatewayGrpcAuthorizationInterceptor>());
return services;
}
}
@@ -0,0 +1,10 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public interface IGatewayRequestIdentityAccessor
{
ApiKeyIdentity? Current { get; }
IDisposable Push(ApiKeyIdentity identity);
}
@@ -0,0 +1,290 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(backendName))
{
throw new ArgumentException("Backend name is required.", nameof(backendName));
}
if (string.IsNullOrWhiteSpace(pipeName))
{
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Nonce is required.", nameof(nonce));
}
SessionId = sessionId;
BackendName = backendName;
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
}
public string SessionId { get; }
public string BackendName { get; }
public string PipeName { get; }
public string Nonce { get; }
public string? ClientIdentity { get; }
public string? ClientSessionName { get; }
public string? ClientCorrelationId { get; }
public TimeSpan CommandTimeout { get; }
public TimeSpan StartupTimeout { get; }
public TimeSpan ShutdownTimeout { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
public IWorkerClient? WorkerClient => _workerClient;
public SessionState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastClientActivityAt
{
get
{
lock (_syncRoot)
{
return _lastClientActivityAt;
}
}
}
public DateTimeOffset? LeaseExpiresAt
{
get
{
lock (_syncRoot)
{
return _leaseExpiresAt;
}
}
}
public string? FinalFault
{
get
{
lock (_syncRoot)
{
return _finalFault;
}
}
}
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
lock (_syncRoot)
{
_workerClient = workerClient;
}
}
public void TransitionTo(SessionState nextState)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
{
return;
}
_state = nextState;
}
}
public void MarkReady()
{
TransitionTo(SessionState.Ready);
}
public void MarkFaulted(string reason)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
_finalFault = reason;
_state = SessionState.Faulted;
}
}
public void TouchClientActivity(DateTimeOffset activityAt)
{
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
}
}
public void ExtendLease(DateTimeOffset leaseExpiresAt)
{
lock (_syncRoot)
{
_leaseExpiresAt = leaseExpiresAt;
}
}
public bool IsLeaseExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
}
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return workerClient.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_state is SessionState.Closed)
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch
{
_workerClient.Kill(reason);
throw;
}
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
finally
{
_closeLock.Release();
}
}
public void KillWorker(string reason)
{
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
}
public async ValueTask DisposeAsync()
{
_closeLock.Dispose();
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
}
}
private IWorkerClient GetReadyWorkerClient()
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Current state is {_state}.");
}
return _workerClient;
}
}
}
@@ -0,0 +1,34 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public interface ISessionManager
{
Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken);
bool TryGetSession(
string sessionId,
out GatewaySession session);
Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken);
Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken);
Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Sessions;
public interface ISessionRegistry
{
int Count { get; }
int ActiveCount { get; }
bool TryAdd(GatewaySession session);
bool TryGet(string sessionId, out GatewaySession session);
bool TryRemove(string sessionId, out GatewaySession session);
IReadOnlyCollection<GatewaySession> Snapshot();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Sessions;
public interface ISessionWorkerClientFactory
{
Task<MxGateway.Server.Workers.IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionCloseResult(
string SessionId,
SessionState FinalState,
bool AlreadyClosed);
@@ -0,0 +1,287 @@
using System.Security.Cryptography;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionManager : ISessionManager
{
public const string DefaultCloseReason = "client-close";
public const string GatewayShutdownReason = "gateway-shutdown";
public const string LeaseExpiredReason = "lease-expired";
private readonly ISessionRegistry _registry;
private readonly ISessionWorkerClientFactory _workerClientFactory;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SessionManager> _logger;
private readonly GatewayOptions _options;
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
TimeProvider? timeProvider = null,
ILogger<SessionManager>? logger = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<SessionManager>.Instance;
_options = options.Value;
}
public async Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
EnsureSessionCapacity();
GatewaySession session = CreateSession(request, clientIdentity);
if (!_registry.TryAdd(session))
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Session id collision while opening session {session.SessionId}.");
}
try
{
session.TransitionTo(SessionState.StartingWorker);
IWorkerClient workerClient = await _workerClientFactory
.CreateAsync(session, cancellationToken)
.ConfigureAwait(false);
session.AttachWorkerClient(workerClient);
session.MarkReady();
_metrics.SessionOpened();
return session;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_registry.TryRemove(session.SessionId, out _);
await session.DisposeAsync().ConfigureAwait(false);
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
_logger.LogWarning(
exception,
"Failed to open gateway session {SessionId}.",
session.SessionId);
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Failed to open session {session.SessionId}.",
exception);
}
}
public bool TryGetSession(
string sessionId,
out GatewaySession session)
{
return _registry.TryGet(sessionId, out session);
}
public async Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
try
{
return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception exception)
{
if (session.WorkerClient?.State == WorkerClientState.Faulted)
{
session.MarkFaulted(exception.Message);
}
throw;
}
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
return session.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
SessionCloseResult result = await CloseSessionCoreAsync(
session,
DefaultCloseReason,
cancellationToken).ConfigureAwait(false);
return result;
}
public async Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
int closedCount = 0;
foreach (GatewaySession session in _registry.Snapshot())
{
if (!session.IsLeaseExpired(now))
{
continue;
}
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
closedCount++;
}
return closedCount;
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (GatewaySession session in _registry.Snapshot())
{
try
{
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Graceful shutdown failed for session {SessionId}; killing worker.",
session.SessionId);
session.KillWorker(GatewayShutdownReason);
}
}
}
private async Task<SessionCloseResult> CloseSessionCoreAsync(
GatewaySession session,
string reason,
CancellationToken cancellationToken)
{
bool wasClosed = session.State == SessionState.Closed;
try
{
SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false);
if (!wasClosed && !result.AlreadyClosed)
{
_metrics.SessionClosed();
}
return result;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString());
throw new SessionManagerException(
SessionManagerErrorCode.CloseFailed,
$"Failed to close session {session.SessionId}.",
exception);
}
}
private GatewaySession GetRequiredSession(string sessionId)
{
if (!_registry.TryGet(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private void EnsureSessionCapacity()
{
if (_registry.ActiveCount >= _options.Sessions.MaxSessions)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionLimitExceeded,
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
}
}
private GatewaySession CreateSession(
SessionOpenRequest request,
string? clientIdentity)
{
string sessionId = CreateSessionId();
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
? GatewayContractInfo.DefaultBackendName
: request.RequestedBackend!;
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
return new GatewaySession(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
request.ClientSessionName,
request.ClientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
openedAt);
}
private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout)
{
if (requestedTimeout is null)
{
return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds);
}
TimeSpan timeout = requestedTimeout.ToTimeSpan();
return timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds)
: timeout;
}
private static string CreateSessionId()
{
return $"session-{Guid.NewGuid():N}";
}
private static string CreateNonce()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes);
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Sessions;
public enum SessionManagerErrorCode
{
SessionNotFound,
SessionNotReady,
SessionLimitExceeded,
OpenFailed,
CloseFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Sessions;
public sealed class SessionManagerException : Exception
{
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public SessionManagerErrorCode ErrorCode { get; }
}
@@ -0,0 +1,22 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionOpenRequest(
string? RequestedBackend,
string? ClientSessionName,
string? ClientCorrelationId,
Duration? CommandTimeout)
{
public static SessionOpenRequest FromContract(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
return new SessionOpenRequest(
request.RequestedBackend,
request.ClientSessionName,
request.ClientCorrelationId,
request.CommandTimeout);
}
}
@@ -0,0 +1,39 @@
using System.Collections.Concurrent;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed class SessionRegistry : ISessionRegistry
{
private readonly ConcurrentDictionary<string, GatewaySession> _sessions = new(StringComparer.Ordinal);
public int Count => _sessions.Count;
public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed);
public bool TryAdd(GatewaySession session)
{
ArgumentNullException.ThrowIfNull(session);
return _sessions.TryAdd(session.SessionId, session);
}
public bool TryGet(
string sessionId,
out GatewaySession session)
{
return _sessions.TryGetValue(sessionId, out session!);
}
public bool TryRemove(
string sessionId,
out GatewaySession session)
{
return _sessions.TryRemove(sessionId, out session!);
}
public IReadOnlyCollection<GatewaySession> Snapshot()
{
return _sessions.Values.ToArray();
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Sessions;
public static class SessionServiceCollectionExtensions
{
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
return services;
}
}
@@ -0,0 +1,144 @@
using System.IO.Pipes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly IWorkerProcessLauncher _workerProcessLauncher;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly GatewayOptions _options;
public SessionWorkerClientFactory(
IWorkerProcessLauncher workerProcessLauncher,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
ILoggerFactory loggerFactory,
TimeProvider? timeProvider = null)
{
_workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_timeProvider = timeProvider ?? TimeProvider.System;
_options = options.Value;
}
public async Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(session);
NamedPipeServerStream? pipe = CreatePipe(session.PipeName);
WorkerProcessHandle? processHandle = null;
IWorkerClient? workerClient = null;
try
{
session.TransitionTo(SessionState.StartingWorker);
processHandle = await _workerProcessLauncher
.LaunchAsync(
new WorkerProcessLaunchRequest(
session.SessionId,
session.PipeName,
GatewayContractInfo.WorkerProtocolVersion,
session.Nonce,
pipe),
cancellationToken)
.ConfigureAwait(false);
session.TransitionTo(SessionState.WaitingForPipe);
await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false);
session.TransitionTo(SessionState.Handshaking);
WorkerFrameProtocolOptions frameOptions = new(
session.SessionId,
GatewayContractInfo.WorkerProtocolVersion,
_options.Worker.MaxMessageBytes);
WorkerClientConnection connection = new(
session.SessionId,
session.Nonce,
pipe,
frameOptions,
processHandle);
WorkerClientOptions clientOptions = new()
{
HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds),
HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds),
EventChannelCapacity = _options.Events.QueueCapacity,
};
workerClient = new WorkerClient(
connection,
clientOptions,
_metrics,
_timeProvider,
_loggerFactory.CreateLogger<WorkerClient>());
pipe = null;
processHandle = null;
session.TransitionTo(SessionState.InitializingWorker);
await workerClient.StartAsync(cancellationToken).ConfigureAwait(false);
return workerClient;
}
catch
{
if (workerClient is not null)
{
await workerClient.DisposeAsync().ConfigureAwait(false);
}
else
{
if (processHandle is not null)
{
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics.WorkerKilled("OpenSessionFailed");
}
}
finally
{
processHandle.Dispose();
}
}
pipe?.Dispose();
}
throw;
}
}
private static NamedPipeServerStream CreatePipe(string pipeName)
{
return new NamedPipeServerStream(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
}
private static async Task WaitForPipeConnectionAsync(
NamedPipeServerStream pipe,
TimeSpan startupTimeout,
CancellationToken cancellationToken)
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(startupTimeout);
await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false);
}
}
@@ -0,0 +1,27 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public interface IWorkerClient : IAsyncDisposable
{
string SessionId { get; }
int? ProcessId { get; }
WorkerClientState State { get; }
DateTimeOffset LastHeartbeatAt { get; }
Task StartAsync(CancellationToken cancellationToken);
Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken);
Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken);
void Kill(string reason);
}
@@ -0,0 +1,14 @@
namespace MxGateway.Server.Workers;
public interface IWorkerProcess : IDisposable
{
int Id { get; }
bool HasExited { get; }
int? ExitCode { get; }
ValueTask WaitForExitAsync(CancellationToken cancellationToken);
void Kill(bool entireProcessTree);
}
@@ -0,0 +1,8 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
public interface IWorkerProcessFactory
{
IWorkerProcess Start(ProcessStartInfo startInfo);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Workers;
public interface IWorkerProcessLauncher
{
Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default);
}
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Workers;
public interface IWorkerStartupProbe
{
Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,27 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
internal sealed class SystemWorkerProcess(Process process) : IWorkerProcess
{
public int Id => process.Id;
public bool HasExited => process.HasExited;
public int? ExitCode => process.HasExited ? process.ExitCode : null;
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
public void Kill(bool entireProcessTree)
{
process.Kill(entireProcessTree);
}
public void Dispose()
{
process.Dispose();
}
}
@@ -0,0 +1,22 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
public sealed class SystemWorkerProcessFactory : IWorkerProcessFactory
{
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
Process process = new()
{
StartInfo = startInfo,
};
if (!process.Start())
{
process.Dispose();
throw new InvalidOperationException("Worker process failed to start.");
}
return new SystemWorkerProcess(process);
}
}

Some files were not shown because too many files have changed in this diff Show More