Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ec1155de6d | |||
| 0c539834dc | |||
| a5098e6815 | |||
| 41ddd122a6 |
@@ -0,0 +1,54 @@
|
|||||||
|
# Worker Frame Protocol
|
||||||
|
|
||||||
|
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
|
||||||
|
messages over a bidirectional named pipe. The frame layer is deliberately small:
|
||||||
|
it handles message boundaries, size limits, protobuf parsing, and envelope
|
||||||
|
validation before higher-level worker client code routes commands, replies,
|
||||||
|
events, and faults.
|
||||||
|
|
||||||
|
## Frame Format
|
||||||
|
|
||||||
|
Each frame starts with a four-byte little-endian unsigned payload length,
|
||||||
|
followed by the serialized `WorkerEnvelope` payload:
|
||||||
|
|
||||||
|
```text
|
||||||
|
uint32 little-endian payload_length
|
||||||
|
payload_length bytes protobuf WorkerEnvelope
|
||||||
|
```
|
||||||
|
|
||||||
|
The reader rejects zero-length payloads and payloads larger than the configured
|
||||||
|
maximum before allocating the payload buffer. The default maximum is 16 MiB,
|
||||||
|
matching the gateway process design.
|
||||||
|
|
||||||
|
## Envelope Validation
|
||||||
|
|
||||||
|
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
|
||||||
|
owning session before returning or writing it:
|
||||||
|
|
||||||
|
- `protocol_version` must match the configured worker protocol version,
|
||||||
|
- `session_id` must match the owning gateway session,
|
||||||
|
- the envelope must contain one typed `body` value.
|
||||||
|
|
||||||
|
Protocol violations throw `WorkerFrameProtocolException` with a
|
||||||
|
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
|
||||||
|
oversized frames, protocol version mismatches, and session mismatches.
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
Run the focused tests after changing the frame protocol:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the gateway build because the frame protocol is part of
|
||||||
|
`MxGateway.Server`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
|
||||||
|
```
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- [Gateway Process Detailed Design](./gateway-process-design.md)
|
||||||
|
- [Protobuf Contracts](./Contracts.md)
|
||||||
@@ -612,6 +612,23 @@ SQLite auth storage should use startup migrations with a `schema_version` table.
|
|||||||
Migrations should run inside transactions and fail startup if the database
|
Migrations should run inside transactions and fail startup if the database
|
||||||
schema is newer than the running binary understands.
|
schema is newer than the running binary understands.
|
||||||
|
|
||||||
|
The v1 auth store uses `Microsoft.Data.Sqlite` and creates the
|
||||||
|
`schema_version`, `api_keys`, and `api_key_audit` tables through
|
||||||
|
`SqliteAuthStoreMigrator`. `AuthStoreMigrationHostedService` runs those
|
||||||
|
migrations at gateway startup when API-key authentication and
|
||||||
|
`Authentication:RunMigrationsOnStartup` are enabled. A database with a newer
|
||||||
|
schema version fails startup instead of being modified by an older gateway
|
||||||
|
binary.
|
||||||
|
|
||||||
|
`IApiKeyStore` reads stored key records and exposes an active-key lookup that
|
||||||
|
excludes rows with `revoked_utc` set. Hash verification belongs to the API-key
|
||||||
|
hashing layer, but the store preserves the `secret_hash` bytes, display name,
|
||||||
|
scopes, timestamps, and revocation state needed by that layer.
|
||||||
|
|
||||||
|
`IApiKeyAuditStore` appends audit events to `api_key_audit` and returns recent
|
||||||
|
events for diagnostics and future administrative tools. Audit records store key
|
||||||
|
ids and event metadata only; they do not store raw API key secrets.
|
||||||
|
|
||||||
Commands requiring authorization:
|
Commands requiring authorization:
|
||||||
|
|
||||||
- writes,
|
- writes,
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ Detailed follow-up docs:
|
|||||||
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
||||||
session manager, worker supervision, gRPC API, event streaming, fault model,
|
session manager, worker supervision, gRPC API, event streaming, fault model,
|
||||||
security, observability, and test strategy.
|
security, observability, and test strategy.
|
||||||
|
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
|
||||||
|
reader/writer and `WorkerEnvelope` validation rules.
|
||||||
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
||||||
MXAccess worker instance, including STA ownership, message pumping, COM
|
MXAccess worker instance, including STA ownership, message pumping, COM
|
||||||
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ using MxGateway.Contracts;
|
|||||||
using MxGateway.Server.Configuration;
|
using MxGateway.Server.Configuration;
|
||||||
using MxGateway.Server.Diagnostics;
|
using MxGateway.Server.Diagnostics;
|
||||||
using MxGateway.Server.Metrics;
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
namespace MxGateway.Server;
|
namespace MxGateway.Server;
|
||||||
|
|
||||||
@@ -23,6 +24,7 @@ public static class GatewayApplication
|
|||||||
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
|
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
|
||||||
|
|
||||||
builder.Services.AddGatewayConfiguration();
|
builder.Services.AddGatewayConfiguration();
|
||||||
|
builder.Services.AddSqliteAuthStore();
|
||||||
builder.Services.AddHealthChecks();
|
builder.Services.AddHealthChecks();
|
||||||
builder.Services.AddSingleton<GatewayMetrics>();
|
builder.Services.AddSingleton<GatewayMetrics>();
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,10 @@
|
|||||||
<TargetFramework>net10.0</TargetFramework>
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
|
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed record ApiKeyAuditEntry(
|
||||||
|
string? KeyId,
|
||||||
|
string EventType,
|
||||||
|
string? RemoteAddress,
|
||||||
|
string? Details);
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed record ApiKeyAuditRecord(
|
||||||
|
long AuditId,
|
||||||
|
string? KeyId,
|
||||||
|
string EventType,
|
||||||
|
string? RemoteAddress,
|
||||||
|
DateTimeOffset CreatedUtc,
|
||||||
|
string? Details);
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed record ApiKeyRecord(
|
||||||
|
string KeyId,
|
||||||
|
string KeyPrefix,
|
||||||
|
byte[] SecretHash,
|
||||||
|
string DisplayName,
|
||||||
|
IReadOnlySet<string> Scopes,
|
||||||
|
DateTimeOffset CreatedUtc,
|
||||||
|
DateTimeOffset? LastUsedUtc,
|
||||||
|
DateTimeOffset? RevokedUtc);
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public static class ApiKeyScopeSerializer
|
||||||
|
{
|
||||||
|
public static string Serialize(IReadOnlySet<string> scopes)
|
||||||
|
{
|
||||||
|
return JsonSerializer.Serialize(scopes.Order(StringComparer.Ordinal));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IReadOnlySet<string> Deserialize(string value)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(value))
|
||||||
|
{
|
||||||
|
return new HashSet<string>(StringComparer.Ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
string[]? scopes = JsonSerializer.Deserialize<string[]>(value);
|
||||||
|
|
||||||
|
return new HashSet<string>(scopes ?? [], StringComparer.Ordinal);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class AuthSqliteConnectionFactory(IOptions<GatewayOptions> options)
|
||||||
|
{
|
||||||
|
public SqliteConnection CreateConnection()
|
||||||
|
{
|
||||||
|
string sqlitePath = options.Value.Authentication.SqlitePath;
|
||||||
|
string? directory = Path.GetDirectoryName(sqlitePath);
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(directory))
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(directory);
|
||||||
|
}
|
||||||
|
|
||||||
|
SqliteConnectionStringBuilder builder = new()
|
||||||
|
{
|
||||||
|
DataSource = sqlitePath,
|
||||||
|
Mode = SqliteOpenMode.ReadWriteCreate
|
||||||
|
};
|
||||||
|
|
||||||
|
return new SqliteConnection(builder.ToString());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,3 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class AuthStoreMigrationException(string message) : InvalidOperationException(message);
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class AuthStoreMigrationHostedService(
|
||||||
|
IOptions<GatewayOptions> options,
|
||||||
|
IAuthStoreMigrator migrator) : IHostedService
|
||||||
|
{
|
||||||
|
public async Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
AuthenticationOptions authentication = options.Value.Authentication;
|
||||||
|
|
||||||
|
if (authentication.Mode == AuthenticationMode.ApiKey && authentication.RunMigrationsOnStartup)
|
||||||
|
{
|
||||||
|
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StopAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public static class AuthStoreServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
public static IServiceCollection AddSqliteAuthStore(this IServiceCollection services)
|
||||||
|
{
|
||||||
|
services.AddSingleton<AuthSqliteConnectionFactory>();
|
||||||
|
services.AddSingleton<IAuthStoreMigrator, SqliteAuthStoreMigrator>();
|
||||||
|
services.AddSingleton<IApiKeyStore, SqliteApiKeyStore>();
|
||||||
|
services.AddSingleton<IApiKeyAuditStore, SqliteApiKeyAuditStore>();
|
||||||
|
services.AddHostedService<AuthStoreMigrationHostedService>();
|
||||||
|
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public interface IApiKeyAuditStore
|
||||||
|
{
|
||||||
|
Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public interface IApiKeyStore
|
||||||
|
{
|
||||||
|
Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public interface IAuthStoreMigrator
|
||||||
|
{
|
||||||
|
Task MigrateAsync(CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class SqliteApiKeyAuditStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAuditStore
|
||||||
|
{
|
||||||
|
public async Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = connectionFactory.CreateConnection();
|
||||||
|
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
INSERT INTO api_key_audit (key_id, event_type, remote_address, created_utc, details)
|
||||||
|
VALUES ($key_id, $event_type, $remote_address, $created_utc, $details);
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$key_id", (object?)entry.KeyId ?? DBNull.Value);
|
||||||
|
command.Parameters.AddWithValue("$event_type", entry.EventType);
|
||||||
|
command.Parameters.AddWithValue("$remote_address", (object?)entry.RemoteAddress ?? DBNull.Value);
|
||||||
|
command.Parameters.AddWithValue("$created_utc", DateTimeOffset.UtcNow.ToString("O"));
|
||||||
|
command.Parameters.AddWithValue("$details", (object?)entry.Details ?? DBNull.Value);
|
||||||
|
|
||||||
|
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (count <= 0)
|
||||||
|
{
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
await using SqliteConnection connection = connectionFactory.CreateConnection();
|
||||||
|
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
SELECT audit_id, key_id, event_type, remote_address, created_utc, details
|
||||||
|
FROM api_key_audit
|
||||||
|
ORDER BY audit_id DESC
|
||||||
|
LIMIT $count;
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$count", count);
|
||||||
|
|
||||||
|
List<ApiKeyAuditRecord> records = [];
|
||||||
|
|
||||||
|
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
records.Add(new ApiKeyAuditRecord(
|
||||||
|
AuditId: reader.GetInt64(0),
|
||||||
|
KeyId: reader.IsDBNull(1) ? null : reader.GetString(1),
|
||||||
|
EventType: reader.GetString(2),
|
||||||
|
RemoteAddress: reader.IsDBNull(3) ? null : reader.GetString(3),
|
||||||
|
CreatedUtc: DateTimeOffset.Parse(
|
||||||
|
reader.GetString(4),
|
||||||
|
System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
Details: reader.IsDBNull(5) ? null : reader.GetString(5)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,86 @@
|
|||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class SqliteApiKeyStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyStore
|
||||||
|
{
|
||||||
|
public Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return FindByKeyIdAsync(keyId, requireActive: false, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return FindByKeyIdAsync(keyId, requireActive: true, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = connectionFactory.CreateConnection();
|
||||||
|
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
UPDATE api_keys
|
||||||
|
SET last_used_utc = $last_used_utc
|
||||||
|
WHERE key_id = $key_id AND revoked_utc IS NULL;
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$key_id", keyId);
|
||||||
|
command.Parameters.AddWithValue("$last_used_utc", usedUtc.ToString("O"));
|
||||||
|
|
||||||
|
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<ApiKeyRecord?> FindByKeyIdAsync(
|
||||||
|
string keyId,
|
||||||
|
bool requireActive,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = connectionFactory.CreateConnection();
|
||||||
|
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = requireActive
|
||||||
|
? """
|
||||||
|
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
|
||||||
|
FROM api_keys
|
||||||
|
WHERE key_id = $key_id AND revoked_utc IS NULL;
|
||||||
|
"""
|
||||||
|
: """
|
||||||
|
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
|
||||||
|
FROM api_keys
|
||||||
|
WHERE key_id = $key_id;
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$key_id", keyId);
|
||||||
|
|
||||||
|
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ReadApiKeyRecord(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ApiKeyRecord ReadApiKeyRecord(SqliteDataReader reader)
|
||||||
|
{
|
||||||
|
return new ApiKeyRecord(
|
||||||
|
KeyId: reader.GetString(0),
|
||||||
|
KeyPrefix: reader.GetString(1),
|
||||||
|
SecretHash: (byte[])reader["secret_hash"],
|
||||||
|
DisplayName: reader.GetString(3),
|
||||||
|
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
|
||||||
|
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
|
||||||
|
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
|
||||||
|
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
|
||||||
|
{
|
||||||
|
return reader.IsDBNull(ordinal)
|
||||||
|
? null
|
||||||
|
: DateTimeOffset.Parse(reader.GetString(ordinal), System.Globalization.CultureInfo.InvariantCulture);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public static class SqliteAuthSchema
|
||||||
|
{
|
||||||
|
public const int CurrentVersion = 1;
|
||||||
|
|
||||||
|
public const string SchemaVersionTable = "schema_version";
|
||||||
|
|
||||||
|
public const string ApiKeysTable = "api_keys";
|
||||||
|
|
||||||
|
public const string ApiKeyAuditTable = "api_key_audit";
|
||||||
|
}
|
||||||
@@ -0,0 +1,135 @@
|
|||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connectionFactory) : IAuthStoreMigrator
|
||||||
|
{
|
||||||
|
public async Task MigrateAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = connectionFactory.CreateConnection();
|
||||||
|
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using SqliteTransaction transaction =
|
||||||
|
(SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
int existingVersion = await ReadExistingSchemaVersionAsync(connection, transaction, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (existingVersion > SqliteAuthSchema.CurrentVersion)
|
||||||
|
{
|
||||||
|
throw new AuthStoreMigrationException(
|
||||||
|
$"Auth database schema version {existingVersion} is newer than supported version {SqliteAuthSchema.CurrentVersion}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
await ApplyVersionOneAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task<int> ReadExistingSchemaVersionAsync(
|
||||||
|
SqliteConnection connection,
|
||||||
|
SqliteTransaction transaction,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using SqliteCommand tableExistsCommand = connection.CreateCommand();
|
||||||
|
tableExistsCommand.Transaction = transaction;
|
||||||
|
tableExistsCommand.CommandText = """
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM sqlite_master
|
||||||
|
WHERE type = 'table' AND name = $table_name;
|
||||||
|
""";
|
||||||
|
tableExistsCommand.Parameters.AddWithValue("$table_name", SqliteAuthSchema.SchemaVersionTable);
|
||||||
|
|
||||||
|
long tableCount = (long)(await tableExistsCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) ?? 0L);
|
||||||
|
|
||||||
|
if (tableCount == 0)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
await using SqliteCommand versionCommand = connection.CreateCommand();
|
||||||
|
versionCommand.Transaction = transaction;
|
||||||
|
versionCommand.CommandText = """
|
||||||
|
SELECT version
|
||||||
|
FROM schema_version
|
||||||
|
WHERE id = 1;
|
||||||
|
""";
|
||||||
|
|
||||||
|
object? version = await versionCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
return version is null || version == DBNull.Value
|
||||||
|
? 0
|
||||||
|
: Convert.ToInt32(version, System.Globalization.CultureInfo.InvariantCulture);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task ApplyVersionOneAsync(
|
||||||
|
SqliteConnection connection,
|
||||||
|
SqliteTransaction transaction,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await ExecuteNonQueryAsync(
|
||||||
|
connection,
|
||||||
|
transaction,
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS schema_version (
|
||||||
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||||
|
version INTEGER NOT NULL,
|
||||||
|
applied_utc TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS api_keys (
|
||||||
|
key_id TEXT PRIMARY KEY,
|
||||||
|
key_prefix TEXT NOT NULL,
|
||||||
|
secret_hash BLOB NOT NULL,
|
||||||
|
display_name TEXT NOT NULL,
|
||||||
|
scopes TEXT NOT NULL,
|
||||||
|
created_utc TEXT NOT NULL,
|
||||||
|
last_used_utc TEXT NULL,
|
||||||
|
revoked_utc TEXT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS api_key_audit (
|
||||||
|
audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
key_id TEXT NULL,
|
||||||
|
event_type TEXT NOT NULL,
|
||||||
|
remote_address TEXT NULL,
|
||||||
|
created_utc TEXT NOT NULL,
|
||||||
|
details TEXT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_api_keys_revoked_utc
|
||||||
|
ON api_keys (revoked_utc);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_api_key_audit_key_id_created_utc
|
||||||
|
ON api_key_audit (key_id, created_utc);
|
||||||
|
""",
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using SqliteCommand versionCommand = connection.CreateCommand();
|
||||||
|
versionCommand.Transaction = transaction;
|
||||||
|
versionCommand.CommandText = """
|
||||||
|
INSERT INTO schema_version (id, version, applied_utc)
|
||||||
|
VALUES (1, $version, $applied_utc)
|
||||||
|
ON CONFLICT(id) DO UPDATE SET
|
||||||
|
version = excluded.version,
|
||||||
|
applied_utc = excluded.applied_utc;
|
||||||
|
""";
|
||||||
|
versionCommand.Parameters.AddWithValue("$version", SqliteAuthSchema.CurrentVersion);
|
||||||
|
versionCommand.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
|
||||||
|
|
||||||
|
await versionCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task ExecuteNonQueryAsync(
|
||||||
|
SqliteConnection connection,
|
||||||
|
SqliteTransaction transaction,
|
||||||
|
string commandText,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.Transaction = transaction;
|
||||||
|
command.CommandText = commandText;
|
||||||
|
|
||||||
|
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
internal static class WorkerEnvelopeValidator
|
||||||
|
{
|
||||||
|
public static void Validate(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
if (envelope.ProtocolVersion != options.ProtocolVersion)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||||
|
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.SessionMismatch,
|
||||||
|
"Worker envelope session id does not match the owning gateway session.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope must include a typed body.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public enum WorkerFrameProtocolErrorCode
|
||||||
|
{
|
||||||
|
Unknown = 0,
|
||||||
|
InvalidConfiguration = 1,
|
||||||
|
EndOfStream = 2,
|
||||||
|
MalformedLength = 3,
|
||||||
|
MessageTooLarge = 4,
|
||||||
|
InvalidEnvelope = 5,
|
||||||
|
ProtocolVersionMismatch = 6,
|
||||||
|
SessionMismatch = 7,
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolException : Exception
|
||||||
|
{
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message)
|
||||||
|
: base(message)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message,
|
||||||
|
Exception innerException)
|
||||||
|
: base(message, innerException)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolErrorCode ErrorCode { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
using MxGateway.Contracts;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolOptions
|
||||||
|
{
|
||||||
|
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(string sessionId)
|
||||||
|
: this(
|
||||||
|
sessionId,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
DefaultMaxMessageBytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(
|
||||||
|
string sessionId,
|
||||||
|
uint protocolVersion,
|
||||||
|
int maxMessageBytes)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a session id.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (protocolVersion == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a non-zero protocol version.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxMessageBytes <= 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol max message size must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionId = sessionId;
|
||||||
|
ProtocolVersion = protocolVersion;
|
||||||
|
MaxMessageBytes = maxMessageBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public string SessionId { get; }
|
||||||
|
|
||||||
|
public uint ProtocolVersion { get; }
|
||||||
|
|
||||||
|
public int MaxMessageBytes { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,77 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameReader
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameReader(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MalformedLength,
|
||||||
|
"Worker frame payload length must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] payload = new byte[payloadLength];
|
||||||
|
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
WorkerEnvelope envelope;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
|
||||||
|
}
|
||||||
|
catch (InvalidProtocolBufferException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async ValueTask ReadExactlyOrThrowAsync(
|
||||||
|
Memory<byte> buffer,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (EndOfStreamException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.EndOfStream,
|
||||||
|
"Worker frame ended before the expected number of bytes were read.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameWriter
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameWriter(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask WriteAsync(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(envelope);
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
int payloadLength = envelope.CalculateSize();
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope cannot serialize to an empty payload.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
|
||||||
|
|
||||||
|
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,209 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolTests
|
||||||
|
{
|
||||||
|
private const string SessionId = "session-1";
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
await using MemoryStream stream = new();
|
||||||
|
WorkerEnvelope original = CreateEnvelope();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
await writer.WriteAsync(original);
|
||||||
|
stream.Position = 0;
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope original = CreateEnvelope();
|
||||||
|
byte[] frame = CreateFrame(original);
|
||||||
|
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
Assert.True(stream.ReadCallCount > 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
await using MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
|
||||||
|
await using MemoryStream stream = new(lengthPrefix);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.ProtocolVersion++;
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.SessionId = "different-session";
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
byte[] frame = CreateFrame([0x80]);
|
||||||
|
await using MemoryStream stream = new(frame);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.ClearBody();
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
|
||||||
|
await using MemoryStream stream = new();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await writer.WriteAsync(CreateEnvelope()));
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||||
|
Assert.Equal(0, stream.Length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEnvelope CreateEnvelope()
|
||||||
|
{
|
||||||
|
return new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
SessionId = SessionId,
|
||||||
|
Sequence = 1,
|
||||||
|
CorrelationId = "correlation-1",
|
||||||
|
WorkerHello = new WorkerHello
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce = "nonce",
|
||||||
|
WorkerProcessId = 1234,
|
||||||
|
WorkerVersion = "test-worker",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(IMessage message)
|
||||||
|
{
|
||||||
|
return CreateFrame(message.ToByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(byte[] payload)
|
||||||
|
{
|
||||||
|
byte[] frame = new byte[sizeof(uint) + payload.Length];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
|
||||||
|
payload.CopyTo(frame.AsSpan(sizeof(uint)));
|
||||||
|
|
||||||
|
return frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class ChunkedReadStream : MemoryStream
|
||||||
|
{
|
||||||
|
private readonly int _chunkSize;
|
||||||
|
|
||||||
|
public ChunkedReadStream(
|
||||||
|
byte[] buffer,
|
||||||
|
int chunkSize)
|
||||||
|
: base(buffer)
|
||||||
|
{
|
||||||
|
_chunkSize = chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int ReadCallCount { get; private set; }
|
||||||
|
|
||||||
|
public override ValueTask<int> ReadAsync(
|
||||||
|
Memory<byte> buffer,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ReadCallCount++;
|
||||||
|
int requestedCount = Math.Min(buffer.Length, _chunkSize);
|
||||||
|
|
||||||
|
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,280 @@
|
|||||||
|
using Microsoft.AspNetCore.Builder;
|
||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using MxGateway.Server;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Security.Authentication;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Security.Authentication;
|
||||||
|
|
||||||
|
public sealed class SqliteAuthStoreTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task MigrateAsync_EmptyDatabase_InitializesCurrentSchema()
|
||||||
|
{
|
||||||
|
string databasePath = CreateTempDatabasePath();
|
||||||
|
await using ServiceProvider services = BuildAuthServices(databasePath);
|
||||||
|
|
||||||
|
IAuthStoreMigrator migrator = services.GetRequiredService<IAuthStoreMigrator>();
|
||||||
|
|
||||||
|
await migrator.MigrateAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(SqliteAuthSchema.CurrentVersion, await ReadSchemaVersionAsync(databasePath));
|
||||||
|
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeysTable));
|
||||||
|
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeyAuditTable));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task MigrateAsync_ExistingVersionZeroDatabase_MigratesIdempotently()
|
||||||
|
{
|
||||||
|
string databasePath = CreateTempDatabasePath();
|
||||||
|
await CreateVersionZeroDatabaseAsync(databasePath);
|
||||||
|
await using ServiceProvider services = BuildAuthServices(databasePath);
|
||||||
|
|
||||||
|
IAuthStoreMigrator migrator = services.GetRequiredService<IAuthStoreMigrator>();
|
||||||
|
|
||||||
|
await migrator.MigrateAsync(CancellationToken.None);
|
||||||
|
await migrator.MigrateAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(SqliteAuthSchema.CurrentVersion, await ReadSchemaVersionAsync(databasePath));
|
||||||
|
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeysTable));
|
||||||
|
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeyAuditTable));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_NewerSchemaVersion_BlocksStartup()
|
||||||
|
{
|
||||||
|
string databasePath = CreateTempDatabasePath();
|
||||||
|
await CreateSchemaVersionDatabaseAsync(databasePath, SqliteAuthSchema.CurrentVersion + 1);
|
||||||
|
|
||||||
|
await using WebApplication app = GatewayApplication.Build(
|
||||||
|
[
|
||||||
|
$"--MxGateway:Authentication:SqlitePath={databasePath}",
|
||||||
|
"--urls=http://127.0.0.1:0"
|
||||||
|
]);
|
||||||
|
|
||||||
|
AuthStoreMigrationException exception = await Assert.ThrowsAsync<AuthStoreMigrationException>(
|
||||||
|
() => app.StartAsync(CancellationToken.None));
|
||||||
|
|
||||||
|
Assert.Contains("newer than supported version", exception.Message, StringComparison.Ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FindActiveByKeyIdAsync_ExistingActiveKey_ReturnsKey()
|
||||||
|
{
|
||||||
|
string databasePath = CreateTempDatabasePath();
|
||||||
|
await using ServiceProvider services = BuildAuthServices(databasePath);
|
||||||
|
await services.GetRequiredService<IAuthStoreMigrator>().MigrateAsync(CancellationToken.None);
|
||||||
|
await InsertApiKeyAsync(databasePath, revokedUtc: null);
|
||||||
|
|
||||||
|
IApiKeyStore store = services.GetRequiredService<IApiKeyStore>();
|
||||||
|
|
||||||
|
ApiKeyRecord? key = await store.FindActiveByKeyIdAsync("test-key", CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.NotNull(key);
|
||||||
|
Assert.Equal("test-key", key.KeyId);
|
||||||
|
Assert.Equal("mxgw_test", key.KeyPrefix);
|
||||||
|
Assert.Equal([1, 2, 3, 4], key.SecretHash);
|
||||||
|
Assert.Contains("session:open", key.Scopes);
|
||||||
|
Assert.Null(key.RevokedUtc);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FindActiveByKeyIdAsync_RevokedKey_ReturnsNull()
|
||||||
|
{
|
||||||
|
string databasePath = CreateTempDatabasePath();
|
||||||
|
await using ServiceProvider services = BuildAuthServices(databasePath);
|
||||||
|
await services.GetRequiredService<IAuthStoreMigrator>().MigrateAsync(CancellationToken.None);
|
||||||
|
await InsertApiKeyAsync(databasePath, DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
IApiKeyStore store = services.GetRequiredService<IApiKeyStore>();
|
||||||
|
|
||||||
|
ApiKeyRecord? activeKey = await store.FindActiveByKeyIdAsync(
|
||||||
|
"test-key",
|
||||||
|
CancellationToken.None);
|
||||||
|
ApiKeyRecord? storedKey = await store.FindByKeyIdAsync("test-key", CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Null(activeKey);
|
||||||
|
Assert.NotNull(storedKey);
|
||||||
|
Assert.NotNull(storedKey.RevokedUtc);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ApiKeyAuditStore_AppendAsync_PersistsAuditEvent()
|
||||||
|
{
|
||||||
|
string databasePath = CreateTempDatabasePath();
|
||||||
|
await using ServiceProvider services = BuildAuthServices(databasePath);
|
||||||
|
await services.GetRequiredService<IAuthStoreMigrator>().MigrateAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
IApiKeyAuditStore auditStore = services.GetRequiredService<IApiKeyAuditStore>();
|
||||||
|
|
||||||
|
await auditStore.AppendAsync(
|
||||||
|
new ApiKeyAuditEntry(
|
||||||
|
KeyId: "test-key",
|
||||||
|
EventType: "lookup",
|
||||||
|
RemoteAddress: "127.0.0.1",
|
||||||
|
Details: "matched active key"),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
IReadOnlyList<ApiKeyAuditRecord> records = await auditStore.ListRecentAsync(
|
||||||
|
10,
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
ApiKeyAuditRecord record = Assert.Single(records);
|
||||||
|
Assert.Equal("test-key", record.KeyId);
|
||||||
|
Assert.Equal("lookup", record.EventType);
|
||||||
|
Assert.Equal("127.0.0.1", record.RemoteAddress);
|
||||||
|
Assert.Equal("matched active key", record.Details);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ServiceProvider BuildAuthServices(string databasePath)
|
||||||
|
{
|
||||||
|
IConfigurationRoot configuration = new ConfigurationBuilder()
|
||||||
|
.AddInMemoryCollection(
|
||||||
|
new Dictionary<string, string?>
|
||||||
|
{
|
||||||
|
["MxGateway:Authentication:SqlitePath"] = databasePath
|
||||||
|
})
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
ServiceCollection services = new();
|
||||||
|
services.AddSingleton<IConfiguration>(configuration);
|
||||||
|
services.AddGatewayConfiguration();
|
||||||
|
services.AddSqliteAuthStore();
|
||||||
|
|
||||||
|
return services.BuildServiceProvider(validateScopes: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string CreateTempDatabasePath()
|
||||||
|
{
|
||||||
|
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-tests", Guid.NewGuid().ToString("N"));
|
||||||
|
Directory.CreateDirectory(directory);
|
||||||
|
|
||||||
|
return Path.Combine(directory, "gateway-auth.db");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task CreateVersionZeroDatabaseAsync(string databasePath)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = CreateConnection(databasePath);
|
||||||
|
await connection.OpenAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
CREATE TABLE schema_version (
|
||||||
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||||
|
version INTEGER NOT NULL,
|
||||||
|
applied_utc TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO schema_version (id, version, applied_utc)
|
||||||
|
VALUES (1, 0, $applied_utc);
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
|
||||||
|
|
||||||
|
await command.ExecuteNonQueryAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task CreateSchemaVersionDatabaseAsync(string databasePath, int version)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = CreateConnection(databasePath);
|
||||||
|
await connection.OpenAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
CREATE TABLE schema_version (
|
||||||
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||||
|
version INTEGER NOT NULL,
|
||||||
|
applied_utc TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO schema_version (id, version, applied_utc)
|
||||||
|
VALUES (1, $version, $applied_utc);
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$version", version);
|
||||||
|
command.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
|
||||||
|
|
||||||
|
await command.ExecuteNonQueryAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task InsertApiKeyAsync(string databasePath, DateTimeOffset? revokedUtc)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = CreateConnection(databasePath);
|
||||||
|
await connection.OpenAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
INSERT INTO api_keys (
|
||||||
|
key_id,
|
||||||
|
key_prefix,
|
||||||
|
secret_hash,
|
||||||
|
display_name,
|
||||||
|
scopes,
|
||||||
|
created_utc,
|
||||||
|
last_used_utc,
|
||||||
|
revoked_utc)
|
||||||
|
VALUES (
|
||||||
|
$key_id,
|
||||||
|
$key_prefix,
|
||||||
|
$secret_hash,
|
||||||
|
$display_name,
|
||||||
|
$scopes,
|
||||||
|
$created_utc,
|
||||||
|
NULL,
|
||||||
|
$revoked_utc);
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$key_id", "test-key");
|
||||||
|
command.Parameters.AddWithValue("$key_prefix", "mxgw_test");
|
||||||
|
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = new byte[] { 1, 2, 3, 4 };
|
||||||
|
command.Parameters.AddWithValue("$display_name", "Test Key");
|
||||||
|
command.Parameters.AddWithValue(
|
||||||
|
"$scopes",
|
||||||
|
ApiKeyScopeSerializer.Serialize(new HashSet<string>(StringComparer.Ordinal) { "session:open", "events:read" }));
|
||||||
|
command.Parameters.AddWithValue("$created_utc", DateTimeOffset.UtcNow.ToString("O"));
|
||||||
|
command.Parameters.AddWithValue("$revoked_utc", revokedUtc?.ToString("O") ?? (object)DBNull.Value);
|
||||||
|
|
||||||
|
await command.ExecuteNonQueryAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task<int> ReadSchemaVersionAsync(string databasePath)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = CreateConnection(databasePath);
|
||||||
|
await connection.OpenAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = "SELECT version FROM schema_version WHERE id = 1;";
|
||||||
|
|
||||||
|
object? result = await command.ExecuteScalarAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
return Convert.ToInt32(result, System.Globalization.CultureInfo.InvariantCulture);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task<bool> TableExistsAsync(string databasePath, string tableName)
|
||||||
|
{
|
||||||
|
await using SqliteConnection connection = CreateConnection(databasePath);
|
||||||
|
await connection.OpenAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
await using SqliteCommand command = connection.CreateCommand();
|
||||||
|
command.CommandText = """
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM sqlite_master
|
||||||
|
WHERE type = 'table' AND name = $table_name;
|
||||||
|
""";
|
||||||
|
command.Parameters.AddWithValue("$table_name", tableName);
|
||||||
|
|
||||||
|
long result = (long)(await command.ExecuteScalarAsync(CancellationToken.None) ?? 0L);
|
||||||
|
|
||||||
|
return result == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SqliteConnection CreateConnection(string databasePath)
|
||||||
|
{
|
||||||
|
SqliteConnectionStringBuilder builder = new()
|
||||||
|
{
|
||||||
|
DataSource = databasePath,
|
||||||
|
Mode = SqliteOpenMode.ReadWriteCreate
|
||||||
|
};
|
||||||
|
|
||||||
|
return new SqliteConnection(builder.ToString());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user