Compare commits

...

4 Commits

Author SHA1 Message Date
Joseph Doherty ec1155de6d Issue #5: implement sqlite auth store and migrations 2026-04-26 16:29:38 -04:00
dohertj2 0c539834dc Merge PR #55: Issue #9 implement worker frame protocol
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:24:38 -04:00
Joseph Doherty a5098e6815 Issue #9: implement worker frame protocol 2026-04-26 16:20:59 -04:00
dohertj2 41ddd122a6 Merge PR #53: Issue #4 add structured logging and metrics foundation
Verified after merging origin/main and resolving startup/test conflicts with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:19:24 -04:00
28 changed files with 1255 additions and 0 deletions
+54
View File
@@ -0,0 +1,54 @@
# Worker Frame Protocol
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
messages over a bidirectional named pipe. The frame layer is deliberately small:
it handles message boundaries, size limits, protobuf parsing, and envelope
validation before higher-level worker client code routes commands, replies,
events, and faults.
## Frame Format
Each frame starts with a four-byte little-endian unsigned payload length,
followed by the serialized `WorkerEnvelope` payload:
```text
uint32 little-endian payload_length
payload_length bytes protobuf WorkerEnvelope
```
The reader rejects zero-length payloads and payloads larger than the configured
maximum before allocating the payload buffer. The default maximum is 16 MiB,
matching the gateway process design.
## Envelope Validation
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
owning session before returning or writing it:
- `protocol_version` must match the configured worker protocol version,
- `session_id` must match the owning gateway session,
- the envelope must contain one typed `body` value.
Protocol violations throw `WorkerFrameProtocolException` with a
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
oversized frames, protocol version mismatches, and session mismatches.
## Verification
Run the focused tests after changing the frame protocol:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
```
Run the gateway build because the frame protocol is part of
`MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Protobuf Contracts](./Contracts.md)
+17
View File
@@ -612,6 +612,23 @@ SQLite auth storage should use startup migrations with a `schema_version` table.
Migrations should run inside transactions and fail startup if the database
schema is newer than the running binary understands.
The v1 auth store uses `Microsoft.Data.Sqlite` and creates the
`schema_version`, `api_keys`, and `api_key_audit` tables through
`SqliteAuthStoreMigrator`. `AuthStoreMigrationHostedService` runs those
migrations at gateway startup when API-key authentication and
`Authentication:RunMigrationsOnStartup` are enabled. A database with a newer
schema version fails startup instead of being modified by an older gateway
binary.
`IApiKeyStore` reads stored key records and exposes an active-key lookup that
excludes rows with `revoked_utc` set. Hash verification belongs to the API-key
hashing layer, but the store preserves the `secret_hash` bytes, display name,
scopes, timestamps, and revocation state needed by that layer.
`IApiKeyAuditStore` appends audit events to `api_key_audit` and returns recent
events for diagnostics and future administrative tools. Audit records store key
ids and event metadata only; they do not store raw API key secrets.
Commands requiring authorization:
- writes,
+2
View File
@@ -45,6 +45,8 @@ Detailed follow-up docs:
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
session manager, worker supervision, gRPC API, event streaming, fault model,
security, observability, and test strategy.
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
reader/writer and `WorkerEnvelope` validation rules.
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
MXAccess worker instance, including STA ownership, message pumping, COM
lifetime, command dispatch, event sinks, conversion, and shutdown.
@@ -2,6 +2,7 @@ using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Diagnostics;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server;
@@ -23,6 +24,7 @@ public static class GatewayApplication
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddGatewayConfiguration();
builder.Services.AddSqliteAuthStore();
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
@@ -4,6 +4,10 @@
<TargetFramework>net10.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAuditEntry(
string? KeyId,
string EventType,
string? RemoteAddress,
string? Details);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAuditRecord(
long AuditId,
string? KeyId,
string EventType,
string? RemoteAddress,
DateTimeOffset CreatedUtc,
string? Details);
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyRecord(
string KeyId,
string KeyPrefix,
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,23 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyScopeSerializer
{
public static string Serialize(IReadOnlySet<string> scopes)
{
return JsonSerializer.Serialize(scopes.Order(StringComparer.Ordinal));
}
public static IReadOnlySet<string> Deserialize(string value)
{
if (string.IsNullOrWhiteSpace(value))
{
return new HashSet<string>(StringComparer.Ordinal);
}
string[]? scopes = JsonSerializer.Deserialize<string[]>(value);
return new HashSet<string>(scopes ?? [], StringComparer.Ordinal);
}
}
@@ -0,0 +1,27 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthSqliteConnectionFactory(IOptions<GatewayOptions> options)
{
public SqliteConnection CreateConnection()
{
string sqlitePath = options.Value.Authentication.SqlitePath;
string? directory = Path.GetDirectoryName(sqlitePath);
if (!string.IsNullOrWhiteSpace(directory))
{
Directory.CreateDirectory(directory);
}
SqliteConnectionStringBuilder builder = new()
{
DataSource = sqlitePath,
Mode = SqliteOpenMode.ReadWriteCreate
};
return new SqliteConnection(builder.ToString());
}
}
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthStoreMigrationException(string message) : InvalidOperationException(message);
@@ -0,0 +1,24 @@
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Security.Authentication;
public sealed class AuthStoreMigrationHostedService(
IOptions<GatewayOptions> options,
IAuthStoreMigrator migrator) : IHostedService
{
public async Task StartAsync(CancellationToken cancellationToken)
{
AuthenticationOptions authentication = options.Value.Authentication;
if (authentication.Mode == AuthenticationMode.ApiKey && authentication.RunMigrationsOnStartup)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
@@ -0,0 +1,15 @@
namespace MxGateway.Server.Security.Authentication;
public static class AuthStoreServiceCollectionExtensions
{
public static IServiceCollection AddSqliteAuthStore(this IServiceCollection services)
{
services.AddSingleton<AuthSqliteConnectionFactory>();
services.AddSingleton<IAuthStoreMigrator, SqliteAuthStoreMigrator>();
services.AddSingleton<IApiKeyStore, SqliteApiKeyStore>();
services.AddSingleton<IApiKeyAuditStore, SqliteApiKeyAuditStore>();
services.AddHostedService<AuthStoreMigrationHostedService>();
return services;
}
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyAuditStore
{
Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken);
Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken);
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyStore
{
Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken);
Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken);
Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken);
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Security.Authentication;
public interface IAuthStoreMigrator
{
Task MigrateAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,65 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyAuditStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAuditStore
{
public async Task AppendAsync(ApiKeyAuditEntry entry, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_key_audit (key_id, event_type, remote_address, created_utc, details)
VALUES ($key_id, $event_type, $remote_address, $created_utc, $details);
""";
command.Parameters.AddWithValue("$key_id", (object?)entry.KeyId ?? DBNull.Value);
command.Parameters.AddWithValue("$event_type", entry.EventType);
command.Parameters.AddWithValue("$remote_address", (object?)entry.RemoteAddress ?? DBNull.Value);
command.Parameters.AddWithValue("$created_utc", DateTimeOffset.UtcNow.ToString("O"));
command.Parameters.AddWithValue("$details", (object?)entry.Details ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ApiKeyAuditRecord>> ListRecentAsync(int count, CancellationToken cancellationToken)
{
if (count <= 0)
{
return [];
}
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT audit_id, key_id, event_type, remote_address, created_utc, details
FROM api_key_audit
ORDER BY audit_id DESC
LIMIT $count;
""";
command.Parameters.AddWithValue("$count", count);
List<ApiKeyAuditRecord> records = [];
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(new ApiKeyAuditRecord(
AuditId: reader.GetInt64(0),
KeyId: reader.IsDBNull(1) ? null : reader.GetString(1),
EventType: reader.GetString(2),
RemoteAddress: reader.IsDBNull(3) ? null : reader.GetString(3),
CreatedUtc: DateTimeOffset.Parse(
reader.GetString(4),
System.Globalization.CultureInfo.InvariantCulture),
Details: reader.IsDBNull(5) ? null : reader.GetString(5)));
}
return records;
}
}
@@ -0,0 +1,86 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyStore
{
public Task<ApiKeyRecord?> FindByKeyIdAsync(string keyId, CancellationToken cancellationToken)
{
return FindByKeyIdAsync(keyId, requireActive: false, cancellationToken);
}
public Task<ApiKeyRecord?> FindActiveByKeyIdAsync(string keyId, CancellationToken cancellationToken)
{
return FindByKeyIdAsync(keyId, requireActive: true, cancellationToken);
}
public async Task MarkKeyUsedAsync(string keyId, DateTimeOffset usedUtc, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET last_used_utc = $last_used_utc
WHERE key_id = $key_id AND revoked_utc IS NULL;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.AddWithValue("$last_used_utc", usedUtc.ToString("O"));
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private async Task<ApiKeyRecord?> FindByKeyIdAsync(
string keyId,
bool requireActive,
CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = requireActive
? """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id AND revoked_utc IS NULL;
"""
: """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
WHERE key_id = $key_id;
""";
command.Parameters.AddWithValue("$key_id", keyId);
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
return ReadApiKeyRecord(reader);
}
private static ApiKeyRecord ReadApiKeyRecord(SqliteDataReader reader)
{
return new ApiKeyRecord(
KeyId: reader.GetString(0),
KeyPrefix: reader.GetString(1),
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
}
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
{
return reader.IsDBNull(ordinal)
? null
: DateTimeOffset.Parse(reader.GetString(ordinal), System.Globalization.CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Security.Authentication;
public static class SqliteAuthSchema
{
public const int CurrentVersion = 1;
public const string SchemaVersionTable = "schema_version";
public const string ApiKeysTable = "api_keys";
public const string ApiKeyAuditTable = "api_key_audit";
}
@@ -0,0 +1,135 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteAuthStoreMigrator(AuthSqliteConnectionFactory connectionFactory) : IAuthStoreMigrator
{
public async Task MigrateAsync(CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteTransaction transaction =
(SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
int existingVersion = await ReadExistingSchemaVersionAsync(connection, transaction, cancellationToken)
.ConfigureAwait(false);
if (existingVersion > SqliteAuthSchema.CurrentVersion)
{
throw new AuthStoreMigrationException(
$"Auth database schema version {existingVersion} is newer than supported version {SqliteAuthSchema.CurrentVersion}.");
}
await ApplyVersionOneAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task<int> ReadExistingSchemaVersionAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await using SqliteCommand tableExistsCommand = connection.CreateCommand();
tableExistsCommand.Transaction = transaction;
tableExistsCommand.CommandText = """
SELECT COUNT(*)
FROM sqlite_master
WHERE type = 'table' AND name = $table_name;
""";
tableExistsCommand.Parameters.AddWithValue("$table_name", SqliteAuthSchema.SchemaVersionTable);
long tableCount = (long)(await tableExistsCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) ?? 0L);
if (tableCount == 0)
{
return 0;
}
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
SELECT version
FROM schema_version
WHERE id = 1;
""";
object? version = await versionCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return version is null || version == DBNull.Value
? 0
: Convert.ToInt32(version, System.Globalization.CultureInfo.InvariantCulture);
}
private static async Task ApplyVersionOneAsync(
SqliteConnection connection,
SqliteTransaction transaction,
CancellationToken cancellationToken)
{
await ExecuteNonQueryAsync(
connection,
transaction,
"""
CREATE TABLE IF NOT EXISTS schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
applied_utc TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS api_keys (
key_id TEXT PRIMARY KEY,
key_prefix TEXT NOT NULL,
secret_hash BLOB NOT NULL,
display_name TEXT NOT NULL,
scopes TEXT NOT NULL,
created_utc TEXT NOT NULL,
last_used_utc TEXT NULL,
revoked_utc TEXT NULL
);
CREATE TABLE IF NOT EXISTS api_key_audit (
audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT NULL,
event_type TEXT NOT NULL,
remote_address TEXT NULL,
created_utc TEXT NOT NULL,
details TEXT NULL
);
CREATE INDEX IF NOT EXISTS ix_api_keys_revoked_utc
ON api_keys (revoked_utc);
CREATE INDEX IF NOT EXISTS ix_api_key_audit_key_id_created_utc
ON api_key_audit (key_id, created_utc);
""",
cancellationToken).ConfigureAwait(false);
await using SqliteCommand versionCommand = connection.CreateCommand();
versionCommand.Transaction = transaction;
versionCommand.CommandText = """
INSERT INTO schema_version (id, version, applied_utc)
VALUES (1, $version, $applied_utc)
ON CONFLICT(id) DO UPDATE SET
version = excluded.version,
applied_utc = excluded.applied_utc;
""";
versionCommand.Parameters.AddWithValue("$version", SqliteAuthSchema.CurrentVersion);
versionCommand.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
await versionCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task ExecuteNonQueryAsync(
SqliteConnection connection,
SqliteTransaction transaction,
string commandText,
CancellationToken cancellationToken)
{
await using SqliteCommand command = connection.CreateCommand();
command.Transaction = transaction;
command.CommandText = commandText;
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}
@@ -0,0 +1,32 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
internal static class WorkerEnvelopeValidator
{
public static void Validate(
WorkerEnvelope envelope,
WorkerFrameProtocolOptions options)
{
if (envelope.ProtocolVersion != options.ProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
}
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.SessionMismatch,
"Worker envelope session id does not match the owning gateway session.");
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope must include a typed body.");
}
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Workers;
public enum WorkerFrameProtocolErrorCode
{
Unknown = 0,
InvalidConfiguration = 1,
EndOfStream = 2,
MalformedLength = 3,
MessageTooLarge = 4,
InvalidEnvelope = 5,
ProtocolVersionMismatch = 6,
SessionMismatch = 7,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameProtocolException : Exception
{
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolErrorCode ErrorCode { get; }
}
@@ -0,0 +1,53 @@
using MxGateway.Contracts;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameProtocolOptions
{
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
public WorkerFrameProtocolOptions(string sessionId)
: this(
sessionId,
GatewayContractInfo.WorkerProtocolVersion,
DefaultMaxMessageBytes)
{
}
public WorkerFrameProtocolOptions(
string sessionId,
uint protocolVersion,
int maxMessageBytes)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a session id.");
}
if (protocolVersion == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a non-zero protocol version.");
}
if (maxMessageBytes <= 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol max message size must be greater than zero.");
}
SessionId = sessionId;
ProtocolVersion = protocolVersion;
MaxMessageBytes = maxMessageBytes;
}
public string SessionId { get; }
public uint ProtocolVersion { get; }
public int MaxMessageBytes { get; }
}
@@ -0,0 +1,77 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameReader
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameReader(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
{
byte[] lengthPrefix = new byte[sizeof(uint)];
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MalformedLength,
"Worker frame payload length must be greater than zero.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] payload = new byte[payloadLength];
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
WorkerEnvelope envelope;
try
{
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
}
catch (InvalidProtocolBufferException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
exception);
}
WorkerEnvelopeValidator.Validate(envelope, _options);
return envelope;
}
private async ValueTask ReadExactlyOrThrowAsync(
Memory<byte> buffer,
CancellationToken cancellationToken)
{
try
{
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (EndOfStreamException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.EndOfStream,
"Worker frame ended before the expected number of bytes were read.",
exception);
}
}
}
@@ -0,0 +1,48 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameWriter
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameWriter(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async ValueTask WriteAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(envelope);
WorkerEnvelopeValidator.Validate(envelope, _options);
int payloadLength = envelope.CalculateSize();
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope cannot serialize to an empty payload.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
}
}
@@ -0,0 +1,209 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Workers;
public sealed class WorkerFrameProtocolTests
{
private const string SessionId = "session-1";
[Fact]
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new();
WorkerEnvelope original = CreateEnvelope();
WorkerFrameWriter writer = new(stream, options);
await writer.WriteAsync(original);
stream.Position = 0;
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
}
[Fact]
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope original = CreateEnvelope();
byte[] frame = CreateFrame(original);
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
Assert.True(stream.ReadCallCount > 2);
}
[Fact]
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new(new byte[sizeof(uint)]);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
await using MemoryStream stream = new(lengthPrefix);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ProtocolVersion++;
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.SessionId = "different-session";
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
byte[] frame = CreateFrame([0x80]);
await using MemoryStream stream = new(frame);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ClearBody();
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
[Fact]
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
await using MemoryStream stream = new();
WorkerFrameWriter writer = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await writer.WriteAsync(CreateEnvelope()));
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
Assert.Equal(0, stream.Length);
}
private static WorkerEnvelope CreateEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 1,
CorrelationId = "correlation-1",
WorkerHello = new WorkerHello
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce",
WorkerProcessId = 1234,
WorkerVersion = "test-worker",
},
};
}
private static byte[] CreateFrame(IMessage message)
{
return CreateFrame(message.ToByteArray());
}
private static byte[] CreateFrame(byte[] payload)
{
byte[] frame = new byte[sizeof(uint) + payload.Length];
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
payload.CopyTo(frame.AsSpan(sizeof(uint)));
return frame;
}
private sealed class ChunkedReadStream : MemoryStream
{
private readonly int _chunkSize;
public ChunkedReadStream(
byte[] buffer,
int chunkSize)
: base(buffer)
{
_chunkSize = chunkSize;
}
public int ReadCallCount { get; private set; }
public override ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default)
{
ReadCallCount++;
int requestedCount = Math.Min(buffer.Length, _chunkSize);
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
}
}
}
@@ -0,0 +1,280 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using MxGateway.Server;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Tests.Security.Authentication;
public sealed class SqliteAuthStoreTests
{
[Fact]
public async Task MigrateAsync_EmptyDatabase_InitializesCurrentSchema()
{
string databasePath = CreateTempDatabasePath();
await using ServiceProvider services = BuildAuthServices(databasePath);
IAuthStoreMigrator migrator = services.GetRequiredService<IAuthStoreMigrator>();
await migrator.MigrateAsync(CancellationToken.None);
Assert.Equal(SqliteAuthSchema.CurrentVersion, await ReadSchemaVersionAsync(databasePath));
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeysTable));
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeyAuditTable));
}
[Fact]
public async Task MigrateAsync_ExistingVersionZeroDatabase_MigratesIdempotently()
{
string databasePath = CreateTempDatabasePath();
await CreateVersionZeroDatabaseAsync(databasePath);
await using ServiceProvider services = BuildAuthServices(databasePath);
IAuthStoreMigrator migrator = services.GetRequiredService<IAuthStoreMigrator>();
await migrator.MigrateAsync(CancellationToken.None);
await migrator.MigrateAsync(CancellationToken.None);
Assert.Equal(SqliteAuthSchema.CurrentVersion, await ReadSchemaVersionAsync(databasePath));
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeysTable));
Assert.True(await TableExistsAsync(databasePath, SqliteAuthSchema.ApiKeyAuditTable));
}
[Fact]
public async Task StartAsync_NewerSchemaVersion_BlocksStartup()
{
string databasePath = CreateTempDatabasePath();
await CreateSchemaVersionDatabaseAsync(databasePath, SqliteAuthSchema.CurrentVersion + 1);
await using WebApplication app = GatewayApplication.Build(
[
$"--MxGateway:Authentication:SqlitePath={databasePath}",
"--urls=http://127.0.0.1:0"
]);
AuthStoreMigrationException exception = await Assert.ThrowsAsync<AuthStoreMigrationException>(
() => app.StartAsync(CancellationToken.None));
Assert.Contains("newer than supported version", exception.Message, StringComparison.Ordinal);
}
[Fact]
public async Task FindActiveByKeyIdAsync_ExistingActiveKey_ReturnsKey()
{
string databasePath = CreateTempDatabasePath();
await using ServiceProvider services = BuildAuthServices(databasePath);
await services.GetRequiredService<IAuthStoreMigrator>().MigrateAsync(CancellationToken.None);
await InsertApiKeyAsync(databasePath, revokedUtc: null);
IApiKeyStore store = services.GetRequiredService<IApiKeyStore>();
ApiKeyRecord? key = await store.FindActiveByKeyIdAsync("test-key", CancellationToken.None);
Assert.NotNull(key);
Assert.Equal("test-key", key.KeyId);
Assert.Equal("mxgw_test", key.KeyPrefix);
Assert.Equal([1, 2, 3, 4], key.SecretHash);
Assert.Contains("session:open", key.Scopes);
Assert.Null(key.RevokedUtc);
}
[Fact]
public async Task FindActiveByKeyIdAsync_RevokedKey_ReturnsNull()
{
string databasePath = CreateTempDatabasePath();
await using ServiceProvider services = BuildAuthServices(databasePath);
await services.GetRequiredService<IAuthStoreMigrator>().MigrateAsync(CancellationToken.None);
await InsertApiKeyAsync(databasePath, DateTimeOffset.UtcNow);
IApiKeyStore store = services.GetRequiredService<IApiKeyStore>();
ApiKeyRecord? activeKey = await store.FindActiveByKeyIdAsync(
"test-key",
CancellationToken.None);
ApiKeyRecord? storedKey = await store.FindByKeyIdAsync("test-key", CancellationToken.None);
Assert.Null(activeKey);
Assert.NotNull(storedKey);
Assert.NotNull(storedKey.RevokedUtc);
}
[Fact]
public async Task ApiKeyAuditStore_AppendAsync_PersistsAuditEvent()
{
string databasePath = CreateTempDatabasePath();
await using ServiceProvider services = BuildAuthServices(databasePath);
await services.GetRequiredService<IAuthStoreMigrator>().MigrateAsync(CancellationToken.None);
IApiKeyAuditStore auditStore = services.GetRequiredService<IApiKeyAuditStore>();
await auditStore.AppendAsync(
new ApiKeyAuditEntry(
KeyId: "test-key",
EventType: "lookup",
RemoteAddress: "127.0.0.1",
Details: "matched active key"),
CancellationToken.None);
IReadOnlyList<ApiKeyAuditRecord> records = await auditStore.ListRecentAsync(
10,
CancellationToken.None);
ApiKeyAuditRecord record = Assert.Single(records);
Assert.Equal("test-key", record.KeyId);
Assert.Equal("lookup", record.EventType);
Assert.Equal("127.0.0.1", record.RemoteAddress);
Assert.Equal("matched active key", record.Details);
}
private static ServiceProvider BuildAuthServices(string databasePath)
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddInMemoryCollection(
new Dictionary<string, string?>
{
["MxGateway:Authentication:SqlitePath"] = databasePath
})
.Build();
ServiceCollection services = new();
services.AddSingleton<IConfiguration>(configuration);
services.AddGatewayConfiguration();
services.AddSqliteAuthStore();
return services.BuildServiceProvider(validateScopes: true);
}
private static string CreateTempDatabasePath()
{
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(directory);
return Path.Combine(directory, "gateway-auth.db");
}
private static async Task CreateVersionZeroDatabaseAsync(string databasePath)
{
await using SqliteConnection connection = CreateConnection(databasePath);
await connection.OpenAsync(CancellationToken.None);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
CREATE TABLE schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
applied_utc TEXT NOT NULL
);
INSERT INTO schema_version (id, version, applied_utc)
VALUES (1, 0, $applied_utc);
""";
command.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
await command.ExecuteNonQueryAsync(CancellationToken.None);
}
private static async Task CreateSchemaVersionDatabaseAsync(string databasePath, int version)
{
await using SqliteConnection connection = CreateConnection(databasePath);
await connection.OpenAsync(CancellationToken.None);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
CREATE TABLE schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
applied_utc TEXT NOT NULL
);
INSERT INTO schema_version (id, version, applied_utc)
VALUES (1, $version, $applied_utc);
""";
command.Parameters.AddWithValue("$version", version);
command.Parameters.AddWithValue("$applied_utc", DateTimeOffset.UtcNow.ToString("O"));
await command.ExecuteNonQueryAsync(CancellationToken.None);
}
private static async Task InsertApiKeyAsync(string databasePath, DateTimeOffset? revokedUtc)
{
await using SqliteConnection connection = CreateConnection(databasePath);
await connection.OpenAsync(CancellationToken.None);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_keys (
key_id,
key_prefix,
secret_hash,
display_name,
scopes,
created_utc,
last_used_utc,
revoked_utc)
VALUES (
$key_id,
$key_prefix,
$secret_hash,
$display_name,
$scopes,
$created_utc,
NULL,
$revoked_utc);
""";
command.Parameters.AddWithValue("$key_id", "test-key");
command.Parameters.AddWithValue("$key_prefix", "mxgw_test");
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = new byte[] { 1, 2, 3, 4 };
command.Parameters.AddWithValue("$display_name", "Test Key");
command.Parameters.AddWithValue(
"$scopes",
ApiKeyScopeSerializer.Serialize(new HashSet<string>(StringComparer.Ordinal) { "session:open", "events:read" }));
command.Parameters.AddWithValue("$created_utc", DateTimeOffset.UtcNow.ToString("O"));
command.Parameters.AddWithValue("$revoked_utc", revokedUtc?.ToString("O") ?? (object)DBNull.Value);
await command.ExecuteNonQueryAsync(CancellationToken.None);
}
private static async Task<int> ReadSchemaVersionAsync(string databasePath)
{
await using SqliteConnection connection = CreateConnection(databasePath);
await connection.OpenAsync(CancellationToken.None);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = "SELECT version FROM schema_version WHERE id = 1;";
object? result = await command.ExecuteScalarAsync(CancellationToken.None);
return Convert.ToInt32(result, System.Globalization.CultureInfo.InvariantCulture);
}
private static async Task<bool> TableExistsAsync(string databasePath, string tableName)
{
await using SqliteConnection connection = CreateConnection(databasePath);
await connection.OpenAsync(CancellationToken.None);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT COUNT(*)
FROM sqlite_master
WHERE type = 'table' AND name = $table_name;
""";
command.Parameters.AddWithValue("$table_name", tableName);
long result = (long)(await command.ExecuteScalarAsync(CancellationToken.None) ?? 0L);
return result == 1;
}
private static SqliteConnection CreateConnection(string databasePath)
{
SqliteConnectionStringBuilder builder = new()
{
DataSource = databasePath,
Mode = SqliteOpenMode.ReadWriteCreate
};
return new SqliteConnection(builder.ToString());
}
}