Compare commits

...

8 Commits

Author SHA1 Message Date
Joseph Doherty a5098e6815 Issue #9: implement worker frame protocol 2026-04-26 16:20:59 -04:00
dohertj2 41ddd122a6 Merge PR #53: Issue #4 add structured logging and metrics foundation
Verified after merging origin/main and resolving startup/test conflicts with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:19:24 -04:00
Joseph Doherty a25f09e795 Merge remote-tracking branch 'origin/main' into agent-3/issue-4-add-structured-logging-and-metrics-foundation
# Conflicts:
#	src/MxGateway.Server/GatewayApplication.cs
#	src/MxGateway.Tests/Gateway/GatewayApplicationTests.cs
2026-04-26 16:17:22 -04:00
dohertj2 37da9d8f44 Merge PR #54: Issue #3 add gateway configuration and validation
Verified after merging origin/main with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:17:20 -04:00
Joseph Doherty a19af5f7cb Merge remote-tracking branch 'origin/main' into agent-2/issue-3-add-gateway-configuration-and-validation 2026-04-26 16:14:23 -04:00
dohertj2 03ab36c4d5 Merge PR #52: Issue #2 define protobuf contracts
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:14:07 -04:00
Joseph Doherty 91ea71b0b7 Issue #3: add gateway configuration and validation 2026-04-26 16:11:30 -04:00
Joseph Doherty 7dfec6dc8c Issue #4: add structured logging and metrics foundation 2026-04-26 16:10:58 -04:00
44 changed files with 1860 additions and 2 deletions
+54
View File
@@ -0,0 +1,54 @@
# Worker Frame Protocol
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
messages over a bidirectional named pipe. The frame layer is deliberately small:
it handles message boundaries, size limits, protobuf parsing, and envelope
validation before higher-level worker client code routes commands, replies,
events, and faults.
## Frame Format
Each frame starts with a four-byte little-endian unsigned payload length,
followed by the serialized `WorkerEnvelope` payload:
```text
uint32 little-endian payload_length
payload_length bytes protobuf WorkerEnvelope
```
The reader rejects zero-length payloads and payloads larger than the configured
maximum before allocating the payload buffer. The default maximum is 16 MiB,
matching the gateway process design.
## Envelope Validation
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
owning session before returning or writing it:
- `protocol_version` must match the configured worker protocol version,
- `session_id` must match the owning gateway session,
- the envelope must contain one typed `body` value.
Protocol violations throw `WorkerFrameProtocolException` with a
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
oversized frames, protocol version mismatches, and session mismatches.
## Verification
Run the focused tests after changing the frame protocol:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
```
Run the gateway build because the frame protocol is part of
`MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Protobuf Contracts](./Contracts.md)
+6 -1
View File
@@ -105,6 +105,12 @@ Do not let Razor components directly mutate gateway session or worker objects.
Create a small read-only dashboard service that projects gateway state into
plain DTOs.
`GatewayMetrics.GetSnapshot()` is the metrics input for the first dashboard
projection. It carries current session and worker gauges, command and event
counters, queue depth, and fault totals. The dashboard reads that snapshot
instead of reading raw `Meter` instruments because exporter configuration is an
operations concern, not a UI dependency.
Suggested service:
```csharp
@@ -361,4 +367,3 @@ The first dashboard slice should implement:
8. workers page with worker table.
9. 1-second realtime refresh through Blazor Server.
10. redaction tests for secrets.
+32
View File
@@ -664,6 +664,26 @@ Metrics:
Do not log credential values or full tag values by default.
The gateway registers `GatewayMetrics` as the in-process metrics foundation.
It emits .NET `Meter` instruments for collectors and keeps a
`GatewayMetricsSnapshot` for dashboard projection. The snapshot exists because
the dashboard needs current counters and queue depths without depending on a
specific metrics exporter.
HTTP request handling uses `UseGatewayRequestLoggingScope()` to attach common
structured log fields when request metadata is present:
- `SessionId`,
- `ClientIdentity`,
- `WorkerProcessId`,
- `CorrelationId`,
- `CommandMethod`.
`GatewayLogRedactor` redacts API key secrets and command values before they are
added to log state. Value logging remains opt-in and redacted by default so
secured writes, authentication commands, and ordinary tag values do not leak
through diagnostics.
## Configuration
Suggested configuration shape:
@@ -710,6 +730,18 @@ Suggested configuration shape:
Do not scatter connection or path constants through implementation code.
`MxGateway.Server` binds this section to `GatewayOptions` at startup and
registers validation with `ValidateOnStart()`. Startup fails before the gateway
begins serving traffic when required authentication settings are missing,
timeouts or queue sizes are not positive, dashboard settings are malformed, or
the configured worker protocol version does not match the contract version.
The gateway exposes read-only effective settings through
`IGatewayConfigurationProvider`. This projection is for dashboard settings and
diagnostics, so it redacts secret-related fields such as
`Authentication:PepperSecretName` and does not include raw API keys or key
material.
## Galaxy Repository Metadata
Galaxy hierarchy and tag metadata can be discovered through SQL Server when
+9
View File
@@ -45,6 +45,8 @@ Detailed follow-up docs:
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
session manager, worker supervision, gRPC API, event streaming, fault model,
security, observability, and test strategy.
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
reader/writer and `WorkerEnvelope` validation rules.
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
MXAccess worker instance, including STA ownership, message pumping, COM
lifetime, command dispatch, event sinks, conversion, and shutdown.
@@ -97,6 +99,13 @@ Responsibilities:
The gateway must never instantiate or call MXAccess directly.
The gateway observability foundation lives in `MxGateway.Server.Diagnostics`
and `MxGateway.Server.Metrics`. Structured logging scopes carry session,
worker, correlation, command, and client identity fields with redaction applied
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
histograms through .NET `Meter` and a snapshot API that dashboard services can
project without binding to a metrics exporter.
### Worker Process
Runtime:
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Configuration;
public enum AuthenticationMode
{
ApiKey,
Disabled
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Configuration;
public sealed class AuthenticationOptions
{
public AuthenticationMode Mode { get; init; } = AuthenticationMode.ApiKey;
public string SqlitePath { get; init; } = @"C:\ProgramData\MxGateway\gateway-auth.db";
public string PepperSecretName { get; init; } = "MxGateway:ApiKeyPepper";
public bool RunMigrationsOnStartup { get; init; } = true;
}
@@ -0,0 +1,20 @@
namespace MxGateway.Server.Configuration;
public sealed class DashboardOptions
{
public bool Enabled { get; init; } = true;
public string PathBase { get; init; } = "/dashboard";
public bool RequireAdminScope { get; init; } = true;
public bool AllowAnonymousLocalhost { get; init; }
public int SnapshotIntervalMilliseconds { get; init; } = 1_000;
public int RecentFaultLimit { get; init; } = 100;
public int RecentSessionLimit { get; init; } = 200;
public bool ShowTagValues { get; init; }
}
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveAuthenticationConfiguration(
string Mode,
string SqlitePath,
string PepperSecretName,
bool RunMigrationsOnStartup);
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveDashboardConfiguration(
bool Enabled,
string PathBase,
bool RequireAdminScope,
bool AllowAnonymousLocalhost,
int SnapshotIntervalMilliseconds,
int RecentFaultLimit,
int RecentSessionLimit,
bool ShowTagValues);
@@ -0,0 +1,5 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveEventConfiguration(
int QueueCapacity,
string BackpressurePolicy);
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveGatewayConfiguration(
EffectiveAuthenticationConfiguration Authentication,
EffectiveWorkerConfiguration Worker,
EffectiveSessionConfiguration Sessions,
EffectiveEventConfiguration Events,
EffectiveDashboardConfiguration Dashboard,
EffectiveProtocolConfiguration Protocol);
@@ -0,0 +1,3 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveProtocolConfiguration(uint WorkerProtocolVersion);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveSessionConfiguration(
int DefaultCommandTimeoutSeconds,
int MaxSessions,
bool AllowMultipleEventSubscribers);
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Configuration;
public sealed record EffectiveWorkerConfiguration(
string ExecutablePath,
string? WorkingDirectory,
string RequiredArchitecture,
int StartupTimeoutSeconds,
int ShutdownTimeoutSeconds,
int HeartbeatIntervalSeconds,
int HeartbeatGraceSeconds,
int MaxMessageBytes);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Configuration;
public enum EventBackpressurePolicy
{
FailFast
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Configuration;
public sealed class EventOptions
{
public int QueueCapacity { get; init; } = 10_000;
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
}
@@ -0,0 +1,46 @@
using Microsoft.Extensions.Options;
namespace MxGateway.Server.Configuration;
public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> options) : IGatewayConfigurationProvider
{
public const string RedactedValue = "[redacted]";
public EffectiveGatewayConfiguration GetEffectiveConfiguration()
{
GatewayOptions value = options.Value;
return new EffectiveGatewayConfiguration(
Authentication: new EffectiveAuthenticationConfiguration(
Mode: value.Authentication.Mode.ToString(),
SqlitePath: value.Authentication.SqlitePath,
PepperSecretName: RedactedValue,
RunMigrationsOnStartup: value.Authentication.RunMigrationsOnStartup),
Worker: new EffectiveWorkerConfiguration(
ExecutablePath: value.Worker.ExecutablePath,
WorkingDirectory: value.Worker.WorkingDirectory,
RequiredArchitecture: value.Worker.RequiredArchitecture.ToString(),
StartupTimeoutSeconds: value.Worker.StartupTimeoutSeconds,
ShutdownTimeoutSeconds: value.Worker.ShutdownTimeoutSeconds,
HeartbeatIntervalSeconds: value.Worker.HeartbeatIntervalSeconds,
HeartbeatGraceSeconds: value.Worker.HeartbeatGraceSeconds,
MaxMessageBytes: value.Worker.MaxMessageBytes),
Sessions: new EffectiveSessionConfiguration(
DefaultCommandTimeoutSeconds: value.Sessions.DefaultCommandTimeoutSeconds,
MaxSessions: value.Sessions.MaxSessions,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
Events: new EffectiveEventConfiguration(
QueueCapacity: value.Events.QueueCapacity,
BackpressurePolicy: value.Events.BackpressurePolicy.ToString()),
Dashboard: new EffectiveDashboardConfiguration(
Enabled: value.Dashboard.Enabled,
PathBase: value.Dashboard.PathBase,
RequireAdminScope: value.Dashboard.RequireAdminScope,
AllowAnonymousLocalhost: value.Dashboard.AllowAnonymousLocalhost,
SnapshotIntervalMilliseconds: value.Dashboard.SnapshotIntervalMilliseconds,
RecentFaultLimit: value.Dashboard.RecentFaultLimit,
RecentSessionLimit: value.Dashboard.RecentSessionLimit,
ShowTagValues: value.Dashboard.ShowTagValues),
Protocol: new EffectiveProtocolConfiguration(value.Protocol.WorkerProtocolVersion));
}
}
@@ -0,0 +1,19 @@
using Microsoft.Extensions.Options;
namespace MxGateway.Server.Configuration;
public static class GatewayConfigurationServiceCollectionExtensions
{
public static IServiceCollection AddGatewayConfiguration(this IServiceCollection services)
{
services
.AddOptions<GatewayOptions>()
.BindConfiguration(GatewayOptions.SectionName)
.ValidateOnStart();
services.AddSingleton<IValidateOptions<GatewayOptions>, GatewayOptionsValidator>();
services.AddSingleton<IGatewayConfigurationProvider, GatewayConfigurationProvider>();
return services;
}
}
@@ -0,0 +1,18 @@
namespace MxGateway.Server.Configuration;
public sealed class GatewayOptions
{
public const string SectionName = "MxGateway";
public AuthenticationOptions Authentication { get; init; } = new();
public WorkerOptions Worker { get; init; } = new();
public SessionOptions Sessions { get; init; } = new();
public EventOptions Events { get; init; } = new();
public DashboardOptions Dashboard { get; init; } = new();
public ProtocolOptions Protocol { get; init; } = new();
}
@@ -0,0 +1,210 @@
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
namespace MxGateway.Server.Configuration;
public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
{
private const int MinimumMaxMessageBytes = 1024;
private const int MaximumMaxMessageBytes = 256 * 1024 * 1024;
public ValidateOptionsResult Validate(string? name, GatewayOptions options)
{
List<string> failures = [];
ValidateAuthentication(options.Authentication, failures);
ValidateWorker(options.Worker, failures);
ValidateSessions(options.Sessions, failures);
ValidateEvents(options.Events, failures);
ValidateDashboard(options.Dashboard, failures);
ValidateProtocol(options.Protocol, failures);
return failures.Count == 0
? ValidateOptionsResult.Success
: ValidateOptionsResult.Fail(failures);
}
private static void ValidateAuthentication(AuthenticationOptions options, List<string> failures)
{
if (!Enum.IsDefined(options.Mode))
{
failures.Add("MxGateway:Authentication:Mode must be a supported authentication mode.");
return;
}
if (options.Mode == AuthenticationMode.ApiKey)
{
AddIfBlank(
options.SqlitePath,
"MxGateway:Authentication:SqlitePath is required when API-key authentication is enabled.",
failures);
AddIfInvalidPath(
options.SqlitePath,
"MxGateway:Authentication:SqlitePath must be a valid filesystem path.",
failures);
AddIfBlank(
options.PepperSecretName,
"MxGateway:Authentication:PepperSecretName is required when API-key authentication is enabled.",
failures);
}
}
private static void ValidateWorker(WorkerOptions options, List<string> failures)
{
AddIfBlank(options.ExecutablePath, "MxGateway:Worker:ExecutablePath is required.", failures);
AddIfInvalidPath(
options.ExecutablePath,
"MxGateway:Worker:ExecutablePath must be a valid filesystem path.",
failures);
if (!string.IsNullOrWhiteSpace(options.ExecutablePath)
&& !string.Equals(Path.GetExtension(options.ExecutablePath), ".exe", StringComparison.OrdinalIgnoreCase))
{
failures.Add("MxGateway:Worker:ExecutablePath must point to a .exe file.");
}
if (!string.IsNullOrWhiteSpace(options.WorkingDirectory))
{
AddIfInvalidPath(
options.WorkingDirectory,
"MxGateway:Worker:WorkingDirectory must be a valid filesystem path.",
failures);
}
if (!Enum.IsDefined(options.RequiredArchitecture))
{
failures.Add("MxGateway:Worker:RequiredArchitecture must be a supported worker architecture.");
}
AddIfNotPositive(
options.StartupTimeoutSeconds,
"MxGateway:Worker:StartupTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.ShutdownTimeoutSeconds,
"MxGateway:Worker:ShutdownTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.HeartbeatIntervalSeconds,
"MxGateway:Worker:HeartbeatIntervalSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.HeartbeatGraceSeconds,
"MxGateway:Worker:HeartbeatGraceSeconds must be greater than zero.",
failures);
if (options.HeartbeatGraceSeconds < options.HeartbeatIntervalSeconds)
{
failures.Add(
"MxGateway:Worker:HeartbeatGraceSeconds must be greater than or equal to HeartbeatIntervalSeconds.");
}
if (options.MaxMessageBytes is < MinimumMaxMessageBytes or > MaximumMaxMessageBytes)
{
failures.Add(
$"MxGateway:Worker:MaxMessageBytes must be between {MinimumMaxMessageBytes} and {MaximumMaxMessageBytes}.");
}
}
private static void ValidateSessions(SessionOptions options, List<string> failures)
{
AddIfNotPositive(
options.DefaultCommandTimeoutSeconds,
"MxGateway:Sessions:DefaultCommandTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(options.MaxSessions, "MxGateway:Sessions:MaxSessions must be greater than zero.", failures);
}
private static void ValidateEvents(EventOptions options, List<string> failures)
{
AddIfNotPositive(options.QueueCapacity, "MxGateway:Events:QueueCapacity must be greater than zero.", failures);
if (!Enum.IsDefined(options.BackpressurePolicy))
{
failures.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
}
}
private static void ValidateDashboard(DashboardOptions options, List<string> failures)
{
if (options.Enabled)
{
AddIfBlank(options.PathBase, "MxGateway:Dashboard:PathBase is required when the dashboard is enabled.", failures);
if (!string.IsNullOrWhiteSpace(options.PathBase) && !options.PathBase.StartsWith('/'))
{
failures.Add("MxGateway:Dashboard:PathBase must start with '/'.");
}
}
AddIfNotPositive(
options.SnapshotIntervalMilliseconds,
"MxGateway:Dashboard:SnapshotIntervalMilliseconds must be greater than zero.",
failures);
AddIfNegative(
options.RecentFaultLimit,
"MxGateway:Dashboard:RecentFaultLimit must be greater than or equal to zero.",
failures);
AddIfNegative(
options.RecentSessionLimit,
"MxGateway:Dashboard:RecentSessionLimit must be greater than or equal to zero.",
failures);
}
private static void ValidateProtocol(ProtocolOptions options, List<string> failures)
{
if (options.WorkerProtocolVersion != GatewayContractInfo.WorkerProtocolVersion)
{
failures.Add(
$"MxGateway:Protocol:WorkerProtocolVersion must be {GatewayContractInfo.WorkerProtocolVersion}.");
}
}
private static void AddIfBlank(string? value, string message, List<string> failures)
{
if (string.IsNullOrWhiteSpace(value))
{
failures.Add(message);
}
}
private static void AddIfNotPositive(int value, string message, List<string> failures)
{
if (value <= 0)
{
failures.Add(message);
}
}
private static void AddIfNegative(int value, string message, List<string> failures)
{
if (value < 0)
{
failures.Add(message);
}
}
private static void AddIfInvalidPath(string? value, string message, List<string> failures)
{
if (string.IsNullOrWhiteSpace(value))
{
return;
}
try
{
_ = Path.GetFullPath(value);
}
catch (ArgumentException)
{
failures.Add(message);
}
catch (NotSupportedException)
{
failures.Add(message);
}
catch (PathTooLongException)
{
failures.Add(message);
}
}
}
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Configuration;
public interface IGatewayConfigurationProvider
{
EffectiveGatewayConfiguration GetEffectiveConfiguration();
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts;
namespace MxGateway.Server.Configuration;
public sealed class ProtocolOptions
{
public uint WorkerProtocolVersion { get; init; } = GatewayContractInfo.WorkerProtocolVersion;
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Configuration;
public sealed class SessionOptions
{
public int DefaultCommandTimeoutSeconds { get; init; } = 30;
public int MaxSessions { get; init; } = 64;
public bool AllowMultipleEventSubscribers { get; init; }
}
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Configuration;
public enum WorkerArchitecture
{
X86,
X64
}
@@ -0,0 +1,21 @@
namespace MxGateway.Server.Configuration;
public sealed class WorkerOptions
{
public string ExecutablePath { get; init; } =
@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe";
public string? WorkingDirectory { get; init; }
public WorkerArchitecture RequiredArchitecture { get; init; } = WorkerArchitecture.X86;
public int StartupTimeoutSeconds { get; init; } = 30;
public int ShutdownTimeoutSeconds { get; init; } = 10;
public int HeartbeatIntervalSeconds { get; init; } = 5;
public int HeartbeatGraceSeconds { get; init; } = 15;
public int MaxMessageBytes { get; init; } = 16 * 1024 * 1024;
}
@@ -0,0 +1,78 @@
namespace MxGateway.Server.Diagnostics;
public static class GatewayLogRedactor
{
public const string RedactedValue = "[redacted]";
private static readonly HashSet<string> SensitiveCommandMethods = new(StringComparer.OrdinalIgnoreCase)
{
"AuthenticateUser",
"WriteSecured",
"WriteSecured2"
};
public static bool IsCredentialBearingCommand(string? commandMethod)
{
return commandMethod is not null
&& SensitiveCommandMethods.Contains(commandMethod);
}
public static string? RedactApiKey(string? authorizationHeader)
{
if (string.IsNullOrWhiteSpace(authorizationHeader))
{
return authorizationHeader;
}
const string bearerPrefix = "Bearer ";
if (!authorizationHeader.StartsWith(bearerPrefix, StringComparison.OrdinalIgnoreCase))
{
return RedactedValue;
}
string token = authorizationHeader[bearerPrefix.Length..].Trim();
if (!token.StartsWith("mxgw_", StringComparison.OrdinalIgnoreCase))
{
return $"{bearerPrefix}{RedactedValue}";
}
string[] tokenParts = token.Split('_', 3, StringSplitOptions.RemoveEmptyEntries);
if (tokenParts.Length < 2)
{
return $"{bearerPrefix}mxgw_{RedactedValue}";
}
return $"{bearerPrefix}mxgw_{tokenParts[1]}_{RedactedValue}";
}
public static string? RedactClientIdentity(string? clientIdentity)
{
if (string.IsNullOrWhiteSpace(clientIdentity))
{
return clientIdentity;
}
return clientIdentity.Contains("mxgw_", StringComparison.OrdinalIgnoreCase)
? RedactApiKey(clientIdentity)
: clientIdentity;
}
public static object? RedactCommandValue(
string? commandMethod,
object? value,
bool valueLoggingEnabled = false)
{
if (value is null)
{
return null;
}
if (!valueLoggingEnabled || IsCredentialBearingCommand(commandMethod))
{
return RedactedValue;
}
return value;
}
}
@@ -0,0 +1,33 @@
namespace MxGateway.Server.Diagnostics;
public sealed record GatewayLogScope(
string? SessionId = null,
int? WorkerProcessId = null,
ulong? CorrelationId = null,
string? CommandMethod = null,
string? ClientIdentity = null)
{
public IReadOnlyDictionary<string, object?> ToDictionary()
{
Dictionary<string, object?> values = [];
AddIfPresent(values, "SessionId", SessionId);
AddIfPresent(values, "WorkerProcessId", WorkerProcessId);
AddIfPresent(values, "CorrelationId", CorrelationId);
AddIfPresent(values, "CommandMethod", CommandMethod);
AddIfPresent(values, "ClientIdentity", GatewayLogRedactor.RedactClientIdentity(ClientIdentity));
return values;
}
private static void AddIfPresent(
Dictionary<string, object?> values,
string key,
object? value)
{
if (value is not null)
{
values[key] = value;
}
}
}
@@ -0,0 +1,16 @@
using Microsoft.Extensions.Logging;
namespace MxGateway.Server.Diagnostics;
public static class GatewayLoggerExtensions
{
public static IDisposable? BeginGatewayScope(
this ILogger logger,
GatewayLogScope scope)
{
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(scope);
return logger.BeginScope(scope.ToDictionary());
}
}
@@ -0,0 +1,57 @@
using Microsoft.Extensions.Primitives;
namespace MxGateway.Server.Diagnostics;
public static class GatewayRequestLoggingMiddlewareExtensions
{
public const string SessionIdHeaderName = "x-session-id";
public const string WorkerProcessIdHeaderName = "x-worker-process-id";
public const string CorrelationIdHeaderName = "x-correlation-id";
public const string CommandMethodHeaderName = "x-command-method";
public static IApplicationBuilder UseGatewayRequestLoggingScope(this IApplicationBuilder app)
{
ArgumentNullException.ThrowIfNull(app);
return app.Use(async (context, next) =>
{
ILogger logger = context.RequestServices
.GetRequiredService<ILoggerFactory>()
.CreateLogger("MxGateway.Request");
using IDisposable? scope = logger.BeginGatewayScope(new GatewayLogScope(
SessionId: ReadHeader(context, SessionIdHeaderName),
WorkerProcessId: ReadInt32Header(context, WorkerProcessIdHeaderName),
CorrelationId: ReadUInt64Header(context, CorrelationIdHeaderName),
CommandMethod: ReadHeader(context, CommandMethodHeaderName),
ClientIdentity: ReadHeader(context, "authorization")));
await next(context);
});
}
private static string? ReadHeader(HttpContext context, string headerName)
{
return context.Request.Headers.TryGetValue(headerName, out StringValues values)
? values.ToString()
: null;
}
private static int? ReadInt32Header(HttpContext context, string headerName)
{
string? value = ReadHeader(context, headerName);
return int.TryParse(value, out int parsedValue)
? parsedValue
: null;
}
private static ulong? ReadUInt64Header(HttpContext context, string headerName)
{
string? value = ReadHeader(context, headerName);
return ulong.TryParse(value, out ulong parsedValue)
? parsedValue
: null;
}
}
@@ -1,4 +1,7 @@
using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Diagnostics;
using MxGateway.Server.Metrics;
namespace MxGateway.Server;
@@ -9,6 +12,7 @@ public static class GatewayApplication
WebApplicationBuilder builder = CreateBuilder(args);
WebApplication app = builder.Build();
app.UseGatewayRequestLoggingScope();
app.MapGatewayEndpoints();
return app;
@@ -18,7 +22,9 @@ public static class GatewayApplication
{
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddGatewayConfiguration();
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
return builder;
}
@@ -0,0 +1,306 @@
using System.Diagnostics.Metrics;
namespace MxGateway.Server.Metrics;
public sealed class GatewayMetrics : IDisposable
{
public const string MeterName = "MxGateway.Server";
private readonly object _syncRoot = new();
private readonly Meter _meter;
private readonly Counter<long> _sessionsOpenedCounter;
private readonly Counter<long> _sessionsClosedCounter;
private readonly Counter<long> _commandsStartedCounter;
private readonly Counter<long> _commandsSucceededCounter;
private readonly Counter<long> _commandsFailedCounter;
private readonly Counter<long> _eventsReceivedCounter;
private readonly Counter<long> _queueOverflowsCounter;
private readonly Counter<long> _faultsCounter;
private readonly Counter<long> _workerKillsCounter;
private readonly Counter<long> _workerExitsCounter;
private readonly Counter<long> _heartbeatFailuresCounter;
private readonly Counter<long> _streamDisconnectsCounter;
private readonly Histogram<double> _workerStartupLatencyHistogram;
private readonly Histogram<double> _commandLatencyHistogram;
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
private int _openSessions;
private int _workersRunning;
private int _eventQueueDepth;
private long _sessionsOpened;
private long _sessionsClosed;
private long _commandsStarted;
private long _commandsSucceeded;
private long _commandsFailed;
private long _eventsReceived;
private long _queueOverflows;
private long _faults;
private long _workerKills;
private long _workerExits;
private long _heartbeatFailures;
private long _streamDisconnects;
private bool _disposed;
public GatewayMetrics()
{
_meter = new Meter(MeterName, typeof(GatewayMetrics).Assembly.GetName().Version?.ToString());
_sessionsOpenedCounter = _meter.CreateCounter<long>("mxgateway.sessions.opened");
_sessionsClosedCounter = _meter.CreateCounter<long>("mxgateway.sessions.closed");
_commandsStartedCounter = _meter.CreateCounter<long>("mxgateway.commands.started");
_commandsSucceededCounter = _meter.CreateCounter<long>("mxgateway.commands.succeeded");
_commandsFailedCounter = _meter.CreateCounter<long>("mxgateway.commands.failed");
_eventsReceivedCounter = _meter.CreateCounter<long>("mxgateway.events.received");
_queueOverflowsCounter = _meter.CreateCounter<long>("mxgateway.queues.overflows");
_faultsCounter = _meter.CreateCounter<long>("mxgateway.faults");
_workerKillsCounter = _meter.CreateCounter<long>("mxgateway.workers.killed");
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
_meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions);
_meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning);
_meter.CreateObservableGauge("mxgateway.events.queue.depth", GetEventQueueDepth);
}
public void SessionOpened()
{
lock (_syncRoot)
{
_openSessions++;
_sessionsOpened++;
}
_sessionsOpenedCounter.Add(1);
}
public void SessionClosed()
{
lock (_syncRoot)
{
if (_openSessions > 0)
{
_openSessions--;
}
_sessionsClosed++;
}
_sessionsClosedCounter.Add(1);
}
public void WorkerStarted(TimeSpan startupDuration)
{
lock (_syncRoot)
{
_workersRunning++;
}
_workerStartupLatencyHistogram.Record(startupDuration.TotalMilliseconds);
}
public void WorkerStopped(string reason)
{
lock (_syncRoot)
{
if (_workersRunning > 0)
{
_workersRunning--;
}
_workerExits++;
}
_workerExitsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public void WorkerKilled(string reason)
{
lock (_syncRoot)
{
_workerKills++;
}
_workerKillsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public void CommandStarted(string method)
{
lock (_syncRoot)
{
_commandsStarted++;
}
_commandsStartedCounter.Add(1, new KeyValuePair<string, object?>("method", method));
}
public void CommandSucceeded(string method, TimeSpan duration)
{
lock (_syncRoot)
{
_commandsSucceeded++;
}
KeyValuePair<string, object?> methodTag = new("method", method);
_commandsSucceededCounter.Add(1, methodTag);
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag);
}
public void CommandFailed(string method, string category, TimeSpan duration)
{
lock (_syncRoot)
{
_commandsFailed++;
Increment(_commandFailuresByMethod, method);
}
KeyValuePair<string, object?> methodTag = new("method", method);
KeyValuePair<string, object?> categoryTag = new("category", category);
_commandsFailedCounter.Add(1, methodTag, categoryTag);
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag, categoryTag);
}
public void EventReceived(string sessionId, string family)
{
lock (_syncRoot)
{
_eventsReceived++;
Increment(_eventsByFamily, family);
}
_eventsReceivedCounter.Add(
1,
new KeyValuePair<string, object?>("session_id", sessionId),
new KeyValuePair<string, object?>("family", family));
}
public void RecordEventStreamSend(string family, TimeSpan duration)
{
_eventStreamSendLatencyHistogram.Record(
duration.TotalMilliseconds,
new KeyValuePair<string, object?>("family", family));
}
public void SetEventQueueDepth(int depth)
{
if (depth < 0)
{
throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative.");
}
lock (_syncRoot)
{
_eventQueueDepth = depth;
}
}
public void QueueOverflow(string queueName)
{
lock (_syncRoot)
{
_queueOverflows++;
}
_queueOverflowsCounter.Add(1, new KeyValuePair<string, object?>("queue", queueName));
}
public void Fault(string category)
{
lock (_syncRoot)
{
_faults++;
}
_faultsCounter.Add(1, new KeyValuePair<string, object?>("category", category));
}
public void HeartbeatFailed(string sessionId)
{
lock (_syncRoot)
{
_heartbeatFailures++;
}
_heartbeatFailuresCounter.Add(1, new KeyValuePair<string, object?>("session_id", sessionId));
}
public void StreamDisconnected(string reason)
{
lock (_syncRoot)
{
_streamDisconnects++;
}
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public GatewayMetricsSnapshot GetSnapshot()
{
lock (_syncRoot)
{
return new GatewayMetricsSnapshot(
OpenSessions: _openSessions,
WorkersRunning: _workersRunning,
EventQueueDepth: _eventQueueDepth,
SessionsOpened: _sessionsOpened,
SessionsClosed: _sessionsClosed,
CommandsStarted: _commandsStarted,
CommandsSucceeded: _commandsSucceeded,
CommandsFailed: _commandsFailed,
EventsReceived: _eventsReceived,
QueueOverflows: _queueOverflows,
Faults: _faults,
WorkerKills: _workerKills,
WorkerExits: _workerExits,
HeartbeatFailures: _heartbeatFailures,
StreamDisconnects: _streamDisconnects,
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase));
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_meter.Dispose();
_disposed = true;
}
private int GetOpenSessions()
{
lock (_syncRoot)
{
return _openSessions;
}
}
private int GetWorkersRunning()
{
lock (_syncRoot)
{
return _workersRunning;
}
}
private int GetEventQueueDepth()
{
lock (_syncRoot)
{
return _eventQueueDepth;
}
}
private static void Increment(Dictionary<string, long> values, string key)
{
values.TryGetValue(key, out long currentValue);
values[key] = currentValue + 1;
}
}
@@ -0,0 +1,20 @@
namespace MxGateway.Server.Metrics;
public sealed record GatewayMetricsSnapshot(
int OpenSessions,
int WorkersRunning,
int EventQueueDepth,
long SessionsOpened,
long SessionsClosed,
long CommandsStarted,
long CommandsSucceeded,
long CommandsFailed,
long EventsReceived,
long QueueOverflows,
long Faults,
long WorkerKills,
long WorkerExits,
long HeartbeatFailures,
long StreamDisconnects,
IReadOnlyDictionary<string, long> CommandFailuresByMethod,
IReadOnlyDictionary<string, long> EventsByFamily);
@@ -0,0 +1,32 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
internal static class WorkerEnvelopeValidator
{
public static void Validate(
WorkerEnvelope envelope,
WorkerFrameProtocolOptions options)
{
if (envelope.ProtocolVersion != options.ProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
}
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.SessionMismatch,
"Worker envelope session id does not match the owning gateway session.");
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope must include a typed body.");
}
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Workers;
public enum WorkerFrameProtocolErrorCode
{
Unknown = 0,
InvalidConfiguration = 1,
EndOfStream = 2,
MalformedLength = 3,
MessageTooLarge = 4,
InvalidEnvelope = 5,
ProtocolVersionMismatch = 6,
SessionMismatch = 7,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameProtocolException : Exception
{
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolErrorCode ErrorCode { get; }
}
@@ -0,0 +1,53 @@
using MxGateway.Contracts;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameProtocolOptions
{
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
public WorkerFrameProtocolOptions(string sessionId)
: this(
sessionId,
GatewayContractInfo.WorkerProtocolVersion,
DefaultMaxMessageBytes)
{
}
public WorkerFrameProtocolOptions(
string sessionId,
uint protocolVersion,
int maxMessageBytes)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a session id.");
}
if (protocolVersion == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a non-zero protocol version.");
}
if (maxMessageBytes <= 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol max message size must be greater than zero.");
}
SessionId = sessionId;
ProtocolVersion = protocolVersion;
MaxMessageBytes = maxMessageBytes;
}
public string SessionId { get; }
public uint ProtocolVersion { get; }
public int MaxMessageBytes { get; }
}
@@ -0,0 +1,77 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameReader
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameReader(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
{
byte[] lengthPrefix = new byte[sizeof(uint)];
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MalformedLength,
"Worker frame payload length must be greater than zero.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] payload = new byte[payloadLength];
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
WorkerEnvelope envelope;
try
{
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
}
catch (InvalidProtocolBufferException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
exception);
}
WorkerEnvelopeValidator.Validate(envelope, _options);
return envelope;
}
private async ValueTask ReadExactlyOrThrowAsync(
Memory<byte> buffer,
CancellationToken cancellationToken)
{
try
{
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (EndOfStreamException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.EndOfStream,
"Worker frame ended before the expected number of bytes were read.",
exception);
}
}
}
@@ -0,0 +1,48 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameWriter
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameWriter(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async ValueTask WriteAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(envelope);
WorkerEnvelopeValidator.Validate(envelope, _options);
int payloadLength = envelope.CalculateSize();
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope cannot serialize to an empty payload.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
}
}
+40 -1
View File
@@ -5,5 +5,44 @@
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"MxGateway": {
"Authentication": {
"Mode": "ApiKey",
"SqlitePath": "C:\\ProgramData\\MxGateway\\gateway-auth.db",
"PepperSecretName": "MxGateway:ApiKeyPepper",
"RunMigrationsOnStartup": true
},
"Worker": {
"ExecutablePath": "src\\MxGateway.Worker\\bin\\x86\\Release\\MxGateway.Worker.exe",
"RequiredArchitecture": "X86",
"StartupTimeoutSeconds": 30,
"ShutdownTimeoutSeconds": 10,
"HeartbeatIntervalSeconds": 5,
"HeartbeatGraceSeconds": 15,
"MaxMessageBytes": 16777216
},
"Sessions": {
"DefaultCommandTimeoutSeconds": 30,
"MaxSessions": 64,
"AllowMultipleEventSubscribers": false
},
"Events": {
"QueueCapacity": 10000,
"BackpressurePolicy": "FailFast"
},
"Dashboard": {
"Enabled": true,
"PathBase": "/dashboard",
"RequireAdminScope": true,
"AllowAnonymousLocalhost": false,
"SnapshotIntervalMilliseconds": 1000,
"RecentFaultLimit": 100,
"RecentSessionLimit": 200,
"ShowTagValues": false
},
"Protocol": {
"WorkerProtocolVersion": 1
}
}
}
@@ -0,0 +1,119 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Tests.Configuration;
public sealed class GatewayOptionsTests
{
[Fact]
public void OptionsBinding_UsesDesignDefaults()
{
GatewayOptions options = BindOptions(new Dictionary<string, string?>());
Assert.Equal(AuthenticationMode.ApiKey, options.Authentication.Mode);
Assert.Equal(@"C:\ProgramData\MxGateway\gateway-auth.db", options.Authentication.SqlitePath);
Assert.Equal("MxGateway:ApiKeyPepper", options.Authentication.PepperSecretName);
Assert.True(options.Authentication.RunMigrationsOnStartup);
Assert.Equal(@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe", options.Worker.ExecutablePath);
Assert.Equal(WorkerArchitecture.X86, options.Worker.RequiredArchitecture);
Assert.Equal(30, options.Worker.StartupTimeoutSeconds);
Assert.Equal(10, options.Worker.ShutdownTimeoutSeconds);
Assert.Equal(5, options.Worker.HeartbeatIntervalSeconds);
Assert.Equal(15, options.Worker.HeartbeatGraceSeconds);
Assert.Equal(16 * 1024 * 1024, options.Worker.MaxMessageBytes);
Assert.Equal(30, options.Sessions.DefaultCommandTimeoutSeconds);
Assert.Equal(64, options.Sessions.MaxSessions);
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
Assert.Equal(10_000, options.Events.QueueCapacity);
Assert.Equal(EventBackpressurePolicy.FailFast, options.Events.BackpressurePolicy);
Assert.True(options.Dashboard.Enabled);
Assert.Equal("/dashboard", options.Dashboard.PathBase);
Assert.True(options.Dashboard.RequireAdminScope);
Assert.False(options.Dashboard.AllowAnonymousLocalhost);
Assert.Equal(1_000, options.Dashboard.SnapshotIntervalMilliseconds);
Assert.Equal(100, options.Dashboard.RecentFaultLimit);
Assert.Equal(200, options.Dashboard.RecentSessionLimit);
Assert.False(options.Dashboard.ShowTagValues);
Assert.Equal(1u, options.Protocol.WorkerProtocolVersion);
}
[Fact]
public void OptionsBinding_AppliesConfigurationOverrides()
{
GatewayOptions options = BindOptions(
new Dictionary<string, string?>
{
["MxGateway:Authentication:Mode"] = "Disabled",
["MxGateway:Worker:ExecutablePath"] = @"C:\Gateway\MxGateway.Worker.exe",
["MxGateway:Sessions:MaxSessions"] = "12",
["MxGateway:Events:QueueCapacity"] = "256",
["MxGateway:Dashboard:Enabled"] = "false"
});
Assert.Equal(AuthenticationMode.Disabled, options.Authentication.Mode);
Assert.Equal(@"C:\Gateway\MxGateway.Worker.exe", options.Worker.ExecutablePath);
Assert.Equal(12, options.Sessions.MaxSessions);
Assert.Equal(256, options.Events.QueueCapacity);
Assert.False(options.Dashboard.Enabled);
}
[Theory]
[InlineData("MxGateway:Worker:ExecutablePath", "worker.dll", "MxGateway:Worker:ExecutablePath must point to a .exe file.")]
[InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")]
[InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")]
[InlineData("MxGateway:Dashboard:PathBase", "dashboard", "MxGateway:Dashboard:PathBase must start with '/'.")]
public void Validation_InvalidConfiguration_FailsClearly(string key, string value, string expectedFailure)
{
OptionsValidationException exception = Assert.Throws<OptionsValidationException>(() =>
_ = BindOptions(new Dictionary<string, string?> { [key] = value }));
Assert.Contains(exception.Failures, failure => failure.Contains(expectedFailure, StringComparison.Ordinal));
}
[Fact]
public void EffectiveConfiguration_RedactsPepperSecretName()
{
using ServiceProvider services = BuildServices(
new Dictionary<string, string?>
{
["MxGateway:Authentication:PepperSecretName"] = "RawPepperSecretName"
});
IGatewayConfigurationProvider provider = services.GetRequiredService<IGatewayConfigurationProvider>();
EffectiveGatewayConfiguration configuration = provider.GetEffectiveConfiguration();
Assert.Equal(GatewayConfigurationProvider.RedactedValue, configuration.Authentication.PepperSecretName);
Assert.DoesNotContain(
"RawPepperSecretName",
System.Text.Json.JsonSerializer.Serialize(configuration),
StringComparison.Ordinal);
}
private static GatewayOptions BindOptions(IReadOnlyDictionary<string, string?> configurationValues)
{
using ServiceProvider services = BuildServices(configurationValues);
return services.GetRequiredService<IOptions<GatewayOptions>>().Value;
}
private static ServiceProvider BuildServices(IReadOnlyDictionary<string, string?> configurationValues)
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddInMemoryCollection(configurationValues)
.Build();
ServiceCollection services = new();
services.AddSingleton<IConfiguration>(configuration);
services.AddGatewayConfiguration();
return services.BuildServiceProvider(validateScopes: true);
}
}
@@ -0,0 +1,73 @@
using MxGateway.Server.Diagnostics;
namespace MxGateway.Tests.Diagnostics;
public sealed class GatewayLogRedactorTests
{
[Fact]
public void RedactApiKey_PreservesKeyIdAndRemovesSecret()
{
string? redacted = GatewayLogRedactor.RedactApiKey("Bearer mxgw_operator01_super-secret");
Assert.Equal("Bearer mxgw_operator01_[redacted]", redacted);
Assert.DoesNotContain("super-secret", redacted);
}
[Theory]
[InlineData("AuthenticateUser")]
[InlineData("WriteSecured")]
[InlineData("WriteSecured2")]
public void IsCredentialBearingCommand_IdentifiesSensitiveMxAccessCommands(string commandMethod)
{
Assert.True(GatewayLogRedactor.IsCredentialBearingCommand(commandMethod));
}
[Fact]
public void RedactCommandValue_DoesNotLogRawValuesByDefault()
{
object? redacted = GatewayLogRedactor.RedactCommandValue("Write", "plaintext-tag-value");
Assert.Equal("[redacted]", redacted);
}
[Fact]
public void RedactCommandValue_RedactsSecuredWriteEvenWhenValueLoggingIsEnabled()
{
object? redacted = GatewayLogRedactor.RedactCommandValue(
"WriteSecured",
"credential-bearing-value",
valueLoggingEnabled: true);
Assert.Equal("[redacted]", redacted);
}
[Fact]
public void RedactCommandValue_AllowsNonSensitiveValueOnlyWhenValueLoggingIsEnabled()
{
object? redacted = GatewayLogRedactor.RedactCommandValue(
"Write",
"diagnostic-value",
valueLoggingEnabled: true);
Assert.Equal("diagnostic-value", redacted);
}
[Fact]
public void LogScope_RedactsClientIdentityBeforeScopeStateIsCreated()
{
GatewayLogScope scope = new(
SessionId: "session-1",
WorkerProcessId: 1234,
CorrelationId: 99,
CommandMethod: "AuthenticateUser",
ClientIdentity: "Bearer mxgw_admin_secret");
IReadOnlyDictionary<string, object?> values = scope.ToDictionary();
Assert.Equal("session-1", values["SessionId"]);
Assert.Equal(1234, values["WorkerProcessId"]);
Assert.Equal((ulong)99, values["CorrelationId"]);
Assert.Equal("AuthenticateUser", values["CommandMethod"]);
Assert.Equal("Bearer mxgw_admin_[redacted]", values["ClientIdentity"]);
}
}
@@ -1,6 +1,9 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MxGateway.Server;
using MxGateway.Server.Metrics;
namespace MxGateway.Tests.Gateway;
@@ -19,4 +22,47 @@ public sealed class GatewayApplicationTests
Assert.Equal("LiveHealth", endpoint.Metadata.GetMetadata<IEndpointNameMetadata>()?.EndpointName);
}
[Fact]
public void Build_RegistersGatewayMetrics()
{
WebApplication app = GatewayApplication.Build([]);
GatewayMetrics metrics = app.Services.GetRequiredService<GatewayMetrics>();
Assert.NotNull(metrics);
}
[Theory]
[InlineData(
"MxGateway:Worker:ExecutablePath",
"worker.dll",
"MxGateway:Worker:ExecutablePath must point to a .exe file.")]
[InlineData(
"MxGateway:Events:QueueCapacity",
"0",
"MxGateway:Events:QueueCapacity must be greater than zero.")]
[InlineData(
"MxGateway:Authentication:PepperSecretName",
"",
"MxGateway:Authentication:PepperSecretName is required")]
[InlineData(
"MxGateway:Dashboard:PathBase",
"dashboard",
"MxGateway:Dashboard:PathBase must start with '/'.")]
public async Task StartAsync_InvalidGatewayConfiguration_FailsStartup(
string key,
string value,
string expectedFailure)
{
await using WebApplication app = GatewayApplication.Build(
[$"--{key}={value}", "--urls=http://127.0.0.1:0"]);
OptionsValidationException exception = await Assert.ThrowsAsync<OptionsValidationException>(
() => app.StartAsync());
Assert.Contains(
exception.Failures,
failure => failure.Contains(expectedFailure, StringComparison.Ordinal));
}
}
@@ -0,0 +1,209 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Workers;
public sealed class WorkerFrameProtocolTests
{
private const string SessionId = "session-1";
[Fact]
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new();
WorkerEnvelope original = CreateEnvelope();
WorkerFrameWriter writer = new(stream, options);
await writer.WriteAsync(original);
stream.Position = 0;
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
}
[Fact]
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope original = CreateEnvelope();
byte[] frame = CreateFrame(original);
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
Assert.True(stream.ReadCallCount > 2);
}
[Fact]
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new(new byte[sizeof(uint)]);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
await using MemoryStream stream = new(lengthPrefix);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ProtocolVersion++;
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.SessionId = "different-session";
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
byte[] frame = CreateFrame([0x80]);
await using MemoryStream stream = new(frame);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ClearBody();
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
[Fact]
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
await using MemoryStream stream = new();
WorkerFrameWriter writer = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await writer.WriteAsync(CreateEnvelope()));
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
Assert.Equal(0, stream.Length);
}
private static WorkerEnvelope CreateEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 1,
CorrelationId = "correlation-1",
WorkerHello = new WorkerHello
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce",
WorkerProcessId = 1234,
WorkerVersion = "test-worker",
},
};
}
private static byte[] CreateFrame(IMessage message)
{
return CreateFrame(message.ToByteArray());
}
private static byte[] CreateFrame(byte[] payload)
{
byte[] frame = new byte[sizeof(uint) + payload.Length];
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
payload.CopyTo(frame.AsSpan(sizeof(uint)));
return frame;
}
private sealed class ChunkedReadStream : MemoryStream
{
private readonly int _chunkSize;
public ChunkedReadStream(
byte[] buffer,
int chunkSize)
: base(buffer)
{
_chunkSize = chunkSize;
}
public int ReadCallCount { get; private set; }
public override ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default)
{
ReadCallCount++;
int requestedCount = Math.Min(buffer.Length, _chunkSize);
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
}
}
}
@@ -0,0 +1,60 @@
using MxGateway.Server.Metrics;
namespace MxGateway.Tests.Metrics;
public sealed class GatewayMetricsTests
{
[Fact]
public void GetSnapshot_ReflectsSessionWorkerCommandEventAndFaultUpdates()
{
using GatewayMetrics metrics = new();
metrics.SessionOpened();
metrics.WorkerStarted(TimeSpan.FromMilliseconds(250));
metrics.CommandStarted("Register");
metrics.CommandSucceeded("Register", TimeSpan.FromMilliseconds(10));
metrics.CommandStarted("WriteSecured");
metrics.CommandFailed("WriteSecured", "AuthorizationFailed", TimeSpan.FromMilliseconds(12));
metrics.EventReceived("session-1", "OnDataChange");
metrics.EventReceived("session-1", "OnDataChange");
metrics.SetEventQueueDepth(7);
metrics.QueueOverflow("session-events");
metrics.Fault("CommandTimeout");
metrics.WorkerKilled("CommandTimeout");
metrics.WorkerStopped("Killed");
metrics.HeartbeatFailed("session-1");
metrics.StreamDisconnected("ClientCancelled");
metrics.SessionClosed();
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(0, snapshot.OpenSessions);
Assert.Equal(0, snapshot.WorkersRunning);
Assert.Equal(7, snapshot.EventQueueDepth);
Assert.Equal(1, snapshot.SessionsOpened);
Assert.Equal(1, snapshot.SessionsClosed);
Assert.Equal(2, snapshot.CommandsStarted);
Assert.Equal(1, snapshot.CommandsSucceeded);
Assert.Equal(1, snapshot.CommandsFailed);
Assert.Equal(2, snapshot.EventsReceived);
Assert.Equal(1, snapshot.QueueOverflows);
Assert.Equal(1, snapshot.Faults);
Assert.Equal(1, snapshot.WorkerKills);
Assert.Equal(1, snapshot.WorkerExits);
Assert.Equal(1, snapshot.HeartbeatFailures);
Assert.Equal(1, snapshot.StreamDisconnects);
Assert.Equal(1, snapshot.CommandFailuresByMethod["WriteSecured"]);
Assert.Equal(2, snapshot.EventsByFamily["OnDataChange"]);
}
[Fact]
public void SetEventQueueDepth_RejectsNegativeDepth()
{
using GatewayMetrics metrics = new();
ArgumentOutOfRangeException exception = Assert.Throws<ArgumentOutOfRangeException>(
() => metrics.SetEventQueueDepth(-1));
Assert.Equal("depth", exception.ParamName);
}
}