Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a5098e6815 | |||
| 41ddd122a6 | |||
| a25f09e795 | |||
| 37da9d8f44 | |||
| 7dfec6dc8c |
@@ -0,0 +1,54 @@
|
|||||||
|
# Worker Frame Protocol
|
||||||
|
|
||||||
|
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
|
||||||
|
messages over a bidirectional named pipe. The frame layer is deliberately small:
|
||||||
|
it handles message boundaries, size limits, protobuf parsing, and envelope
|
||||||
|
validation before higher-level worker client code routes commands, replies,
|
||||||
|
events, and faults.
|
||||||
|
|
||||||
|
## Frame Format
|
||||||
|
|
||||||
|
Each frame starts with a four-byte little-endian unsigned payload length,
|
||||||
|
followed by the serialized `WorkerEnvelope` payload:
|
||||||
|
|
||||||
|
```text
|
||||||
|
uint32 little-endian payload_length
|
||||||
|
payload_length bytes protobuf WorkerEnvelope
|
||||||
|
```
|
||||||
|
|
||||||
|
The reader rejects zero-length payloads and payloads larger than the configured
|
||||||
|
maximum before allocating the payload buffer. The default maximum is 16 MiB,
|
||||||
|
matching the gateway process design.
|
||||||
|
|
||||||
|
## Envelope Validation
|
||||||
|
|
||||||
|
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
|
||||||
|
owning session before returning or writing it:
|
||||||
|
|
||||||
|
- `protocol_version` must match the configured worker protocol version,
|
||||||
|
- `session_id` must match the owning gateway session,
|
||||||
|
- the envelope must contain one typed `body` value.
|
||||||
|
|
||||||
|
Protocol violations throw `WorkerFrameProtocolException` with a
|
||||||
|
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
|
||||||
|
oversized frames, protocol version mismatches, and session mismatches.
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
Run the focused tests after changing the frame protocol:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the gateway build because the frame protocol is part of
|
||||||
|
`MxGateway.Server`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
|
||||||
|
```
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- [Gateway Process Detailed Design](./gateway-process-design.md)
|
||||||
|
- [Protobuf Contracts](./Contracts.md)
|
||||||
@@ -105,6 +105,12 @@ Do not let Razor components directly mutate gateway session or worker objects.
|
|||||||
Create a small read-only dashboard service that projects gateway state into
|
Create a small read-only dashboard service that projects gateway state into
|
||||||
plain DTOs.
|
plain DTOs.
|
||||||
|
|
||||||
|
`GatewayMetrics.GetSnapshot()` is the metrics input for the first dashboard
|
||||||
|
projection. It carries current session and worker gauges, command and event
|
||||||
|
counters, queue depth, and fault totals. The dashboard reads that snapshot
|
||||||
|
instead of reading raw `Meter` instruments because exporter configuration is an
|
||||||
|
operations concern, not a UI dependency.
|
||||||
|
|
||||||
Suggested service:
|
Suggested service:
|
||||||
|
|
||||||
```csharp
|
```csharp
|
||||||
@@ -361,4 +367,3 @@ The first dashboard slice should implement:
|
|||||||
8. workers page with worker table.
|
8. workers page with worker table.
|
||||||
9. 1-second realtime refresh through Blazor Server.
|
9. 1-second realtime refresh through Blazor Server.
|
||||||
10. redaction tests for secrets.
|
10. redaction tests for secrets.
|
||||||
|
|
||||||
|
|||||||
@@ -664,6 +664,26 @@ Metrics:
|
|||||||
|
|
||||||
Do not log credential values or full tag values by default.
|
Do not log credential values or full tag values by default.
|
||||||
|
|
||||||
|
The gateway registers `GatewayMetrics` as the in-process metrics foundation.
|
||||||
|
It emits .NET `Meter` instruments for collectors and keeps a
|
||||||
|
`GatewayMetricsSnapshot` for dashboard projection. The snapshot exists because
|
||||||
|
the dashboard needs current counters and queue depths without depending on a
|
||||||
|
specific metrics exporter.
|
||||||
|
|
||||||
|
HTTP request handling uses `UseGatewayRequestLoggingScope()` to attach common
|
||||||
|
structured log fields when request metadata is present:
|
||||||
|
|
||||||
|
- `SessionId`,
|
||||||
|
- `ClientIdentity`,
|
||||||
|
- `WorkerProcessId`,
|
||||||
|
- `CorrelationId`,
|
||||||
|
- `CommandMethod`.
|
||||||
|
|
||||||
|
`GatewayLogRedactor` redacts API key secrets and command values before they are
|
||||||
|
added to log state. Value logging remains opt-in and redacted by default so
|
||||||
|
secured writes, authentication commands, and ordinary tag values do not leak
|
||||||
|
through diagnostics.
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
Suggested configuration shape:
|
Suggested configuration shape:
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ Detailed follow-up docs:
|
|||||||
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
||||||
session manager, worker supervision, gRPC API, event streaming, fault model,
|
session manager, worker supervision, gRPC API, event streaming, fault model,
|
||||||
security, observability, and test strategy.
|
security, observability, and test strategy.
|
||||||
|
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
|
||||||
|
reader/writer and `WorkerEnvelope` validation rules.
|
||||||
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
||||||
MXAccess worker instance, including STA ownership, message pumping, COM
|
MXAccess worker instance, including STA ownership, message pumping, COM
|
||||||
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
||||||
@@ -97,6 +99,13 @@ Responsibilities:
|
|||||||
|
|
||||||
The gateway must never instantiate or call MXAccess directly.
|
The gateway must never instantiate or call MXAccess directly.
|
||||||
|
|
||||||
|
The gateway observability foundation lives in `MxGateway.Server.Diagnostics`
|
||||||
|
and `MxGateway.Server.Metrics`. Structured logging scopes carry session,
|
||||||
|
worker, correlation, command, and client identity fields with redaction applied
|
||||||
|
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
|
||||||
|
histograms through .NET `Meter` and a snapshot API that dashboard services can
|
||||||
|
project without binding to a metrics exporter.
|
||||||
|
|
||||||
### Worker Process
|
### Worker Process
|
||||||
|
|
||||||
Runtime:
|
Runtime:
|
||||||
|
|||||||
@@ -0,0 +1,78 @@
|
|||||||
|
namespace MxGateway.Server.Diagnostics;
|
||||||
|
|
||||||
|
public static class GatewayLogRedactor
|
||||||
|
{
|
||||||
|
public const string RedactedValue = "[redacted]";
|
||||||
|
|
||||||
|
private static readonly HashSet<string> SensitiveCommandMethods = new(StringComparer.OrdinalIgnoreCase)
|
||||||
|
{
|
||||||
|
"AuthenticateUser",
|
||||||
|
"WriteSecured",
|
||||||
|
"WriteSecured2"
|
||||||
|
};
|
||||||
|
|
||||||
|
public static bool IsCredentialBearingCommand(string? commandMethod)
|
||||||
|
{
|
||||||
|
return commandMethod is not null
|
||||||
|
&& SensitiveCommandMethods.Contains(commandMethod);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static string? RedactApiKey(string? authorizationHeader)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(authorizationHeader))
|
||||||
|
{
|
||||||
|
return authorizationHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
const string bearerPrefix = "Bearer ";
|
||||||
|
if (!authorizationHeader.StartsWith(bearerPrefix, StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
return RedactedValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
string token = authorizationHeader[bearerPrefix.Length..].Trim();
|
||||||
|
|
||||||
|
if (!token.StartsWith("mxgw_", StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
return $"{bearerPrefix}{RedactedValue}";
|
||||||
|
}
|
||||||
|
|
||||||
|
string[] tokenParts = token.Split('_', 3, StringSplitOptions.RemoveEmptyEntries);
|
||||||
|
if (tokenParts.Length < 2)
|
||||||
|
{
|
||||||
|
return $"{bearerPrefix}mxgw_{RedactedValue}";
|
||||||
|
}
|
||||||
|
|
||||||
|
return $"{bearerPrefix}mxgw_{tokenParts[1]}_{RedactedValue}";
|
||||||
|
}
|
||||||
|
|
||||||
|
public static string? RedactClientIdentity(string? clientIdentity)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(clientIdentity))
|
||||||
|
{
|
||||||
|
return clientIdentity;
|
||||||
|
}
|
||||||
|
|
||||||
|
return clientIdentity.Contains("mxgw_", StringComparison.OrdinalIgnoreCase)
|
||||||
|
? RedactApiKey(clientIdentity)
|
||||||
|
: clientIdentity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static object? RedactCommandValue(
|
||||||
|
string? commandMethod,
|
||||||
|
object? value,
|
||||||
|
bool valueLoggingEnabled = false)
|
||||||
|
{
|
||||||
|
if (value is null)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!valueLoggingEnabled || IsCredentialBearingCommand(commandMethod))
|
||||||
|
{
|
||||||
|
return RedactedValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
namespace MxGateway.Server.Diagnostics;
|
||||||
|
|
||||||
|
public sealed record GatewayLogScope(
|
||||||
|
string? SessionId = null,
|
||||||
|
int? WorkerProcessId = null,
|
||||||
|
ulong? CorrelationId = null,
|
||||||
|
string? CommandMethod = null,
|
||||||
|
string? ClientIdentity = null)
|
||||||
|
{
|
||||||
|
public IReadOnlyDictionary<string, object?> ToDictionary()
|
||||||
|
{
|
||||||
|
Dictionary<string, object?> values = [];
|
||||||
|
|
||||||
|
AddIfPresent(values, "SessionId", SessionId);
|
||||||
|
AddIfPresent(values, "WorkerProcessId", WorkerProcessId);
|
||||||
|
AddIfPresent(values, "CorrelationId", CorrelationId);
|
||||||
|
AddIfPresent(values, "CommandMethod", CommandMethod);
|
||||||
|
AddIfPresent(values, "ClientIdentity", GatewayLogRedactor.RedactClientIdentity(ClientIdentity));
|
||||||
|
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void AddIfPresent(
|
||||||
|
Dictionary<string, object?> values,
|
||||||
|
string key,
|
||||||
|
object? value)
|
||||||
|
{
|
||||||
|
if (value is not null)
|
||||||
|
{
|
||||||
|
values[key] = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Diagnostics;
|
||||||
|
|
||||||
|
public static class GatewayLoggerExtensions
|
||||||
|
{
|
||||||
|
public static IDisposable? BeginGatewayScope(
|
||||||
|
this ILogger logger,
|
||||||
|
GatewayLogScope scope)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(logger);
|
||||||
|
ArgumentNullException.ThrowIfNull(scope);
|
||||||
|
|
||||||
|
return logger.BeginScope(scope.ToDictionary());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
using Microsoft.Extensions.Primitives;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Diagnostics;
|
||||||
|
|
||||||
|
public static class GatewayRequestLoggingMiddlewareExtensions
|
||||||
|
{
|
||||||
|
public const string SessionIdHeaderName = "x-session-id";
|
||||||
|
public const string WorkerProcessIdHeaderName = "x-worker-process-id";
|
||||||
|
public const string CorrelationIdHeaderName = "x-correlation-id";
|
||||||
|
public const string CommandMethodHeaderName = "x-command-method";
|
||||||
|
|
||||||
|
public static IApplicationBuilder UseGatewayRequestLoggingScope(this IApplicationBuilder app)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(app);
|
||||||
|
|
||||||
|
return app.Use(async (context, next) =>
|
||||||
|
{
|
||||||
|
ILogger logger = context.RequestServices
|
||||||
|
.GetRequiredService<ILoggerFactory>()
|
||||||
|
.CreateLogger("MxGateway.Request");
|
||||||
|
|
||||||
|
using IDisposable? scope = logger.BeginGatewayScope(new GatewayLogScope(
|
||||||
|
SessionId: ReadHeader(context, SessionIdHeaderName),
|
||||||
|
WorkerProcessId: ReadInt32Header(context, WorkerProcessIdHeaderName),
|
||||||
|
CorrelationId: ReadUInt64Header(context, CorrelationIdHeaderName),
|
||||||
|
CommandMethod: ReadHeader(context, CommandMethodHeaderName),
|
||||||
|
ClientIdentity: ReadHeader(context, "authorization")));
|
||||||
|
|
||||||
|
await next(context);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string? ReadHeader(HttpContext context, string headerName)
|
||||||
|
{
|
||||||
|
return context.Request.Headers.TryGetValue(headerName, out StringValues values)
|
||||||
|
? values.ToString()
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int? ReadInt32Header(HttpContext context, string headerName)
|
||||||
|
{
|
||||||
|
string? value = ReadHeader(context, headerName);
|
||||||
|
|
||||||
|
return int.TryParse(value, out int parsedValue)
|
||||||
|
? parsedValue
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ulong? ReadUInt64Header(HttpContext context, string headerName)
|
||||||
|
{
|
||||||
|
string? value = ReadHeader(context, headerName);
|
||||||
|
|
||||||
|
return ulong.TryParse(value, out ulong parsedValue)
|
||||||
|
? parsedValue
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
using MxGateway.Contracts;
|
using MxGateway.Contracts;
|
||||||
using MxGateway.Server.Configuration;
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Diagnostics;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
|
||||||
namespace MxGateway.Server;
|
namespace MxGateway.Server;
|
||||||
|
|
||||||
@@ -10,6 +12,7 @@ public static class GatewayApplication
|
|||||||
WebApplicationBuilder builder = CreateBuilder(args);
|
WebApplicationBuilder builder = CreateBuilder(args);
|
||||||
WebApplication app = builder.Build();
|
WebApplication app = builder.Build();
|
||||||
|
|
||||||
|
app.UseGatewayRequestLoggingScope();
|
||||||
app.MapGatewayEndpoints();
|
app.MapGatewayEndpoints();
|
||||||
|
|
||||||
return app;
|
return app;
|
||||||
@@ -21,6 +24,7 @@ public static class GatewayApplication
|
|||||||
|
|
||||||
builder.Services.AddGatewayConfiguration();
|
builder.Services.AddGatewayConfiguration();
|
||||||
builder.Services.AddHealthChecks();
|
builder.Services.AddHealthChecks();
|
||||||
|
builder.Services.AddSingleton<GatewayMetrics>();
|
||||||
|
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,306 @@
|
|||||||
|
using System.Diagnostics.Metrics;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Metrics;
|
||||||
|
|
||||||
|
public sealed class GatewayMetrics : IDisposable
|
||||||
|
{
|
||||||
|
public const string MeterName = "MxGateway.Server";
|
||||||
|
|
||||||
|
private readonly object _syncRoot = new();
|
||||||
|
private readonly Meter _meter;
|
||||||
|
private readonly Counter<long> _sessionsOpenedCounter;
|
||||||
|
private readonly Counter<long> _sessionsClosedCounter;
|
||||||
|
private readonly Counter<long> _commandsStartedCounter;
|
||||||
|
private readonly Counter<long> _commandsSucceededCounter;
|
||||||
|
private readonly Counter<long> _commandsFailedCounter;
|
||||||
|
private readonly Counter<long> _eventsReceivedCounter;
|
||||||
|
private readonly Counter<long> _queueOverflowsCounter;
|
||||||
|
private readonly Counter<long> _faultsCounter;
|
||||||
|
private readonly Counter<long> _workerKillsCounter;
|
||||||
|
private readonly Counter<long> _workerExitsCounter;
|
||||||
|
private readonly Counter<long> _heartbeatFailuresCounter;
|
||||||
|
private readonly Counter<long> _streamDisconnectsCounter;
|
||||||
|
private readonly Histogram<double> _workerStartupLatencyHistogram;
|
||||||
|
private readonly Histogram<double> _commandLatencyHistogram;
|
||||||
|
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
|
||||||
|
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
private readonly Dictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
|
private int _openSessions;
|
||||||
|
private int _workersRunning;
|
||||||
|
private int _eventQueueDepth;
|
||||||
|
private long _sessionsOpened;
|
||||||
|
private long _sessionsClosed;
|
||||||
|
private long _commandsStarted;
|
||||||
|
private long _commandsSucceeded;
|
||||||
|
private long _commandsFailed;
|
||||||
|
private long _eventsReceived;
|
||||||
|
private long _queueOverflows;
|
||||||
|
private long _faults;
|
||||||
|
private long _workerKills;
|
||||||
|
private long _workerExits;
|
||||||
|
private long _heartbeatFailures;
|
||||||
|
private long _streamDisconnects;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public GatewayMetrics()
|
||||||
|
{
|
||||||
|
_meter = new Meter(MeterName, typeof(GatewayMetrics).Assembly.GetName().Version?.ToString());
|
||||||
|
_sessionsOpenedCounter = _meter.CreateCounter<long>("mxgateway.sessions.opened");
|
||||||
|
_sessionsClosedCounter = _meter.CreateCounter<long>("mxgateway.sessions.closed");
|
||||||
|
_commandsStartedCounter = _meter.CreateCounter<long>("mxgateway.commands.started");
|
||||||
|
_commandsSucceededCounter = _meter.CreateCounter<long>("mxgateway.commands.succeeded");
|
||||||
|
_commandsFailedCounter = _meter.CreateCounter<long>("mxgateway.commands.failed");
|
||||||
|
_eventsReceivedCounter = _meter.CreateCounter<long>("mxgateway.events.received");
|
||||||
|
_queueOverflowsCounter = _meter.CreateCounter<long>("mxgateway.queues.overflows");
|
||||||
|
_faultsCounter = _meter.CreateCounter<long>("mxgateway.faults");
|
||||||
|
_workerKillsCounter = _meter.CreateCounter<long>("mxgateway.workers.killed");
|
||||||
|
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
|
||||||
|
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
|
||||||
|
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
|
||||||
|
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
|
||||||
|
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
|
||||||
|
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
|
||||||
|
|
||||||
|
_meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions);
|
||||||
|
_meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning);
|
||||||
|
_meter.CreateObservableGauge("mxgateway.events.queue.depth", GetEventQueueDepth);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SessionOpened()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_openSessions++;
|
||||||
|
_sessionsOpened++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_sessionsOpenedCounter.Add(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SessionClosed()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_openSessions > 0)
|
||||||
|
{
|
||||||
|
_openSessions--;
|
||||||
|
}
|
||||||
|
|
||||||
|
_sessionsClosed++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_sessionsClosedCounter.Add(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void WorkerStarted(TimeSpan startupDuration)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_workersRunning++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_workerStartupLatencyHistogram.Record(startupDuration.TotalMilliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void WorkerStopped(string reason)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_workersRunning > 0)
|
||||||
|
{
|
||||||
|
_workersRunning--;
|
||||||
|
}
|
||||||
|
|
||||||
|
_workerExits++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_workerExitsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void WorkerKilled(string reason)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_workerKills++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_workerKillsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CommandStarted(string method)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_commandsStarted++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_commandsStartedCounter.Add(1, new KeyValuePair<string, object?>("method", method));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CommandSucceeded(string method, TimeSpan duration)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_commandsSucceeded++;
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyValuePair<string, object?> methodTag = new("method", method);
|
||||||
|
_commandsSucceededCounter.Add(1, methodTag);
|
||||||
|
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void CommandFailed(string method, string category, TimeSpan duration)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_commandsFailed++;
|
||||||
|
Increment(_commandFailuresByMethod, method);
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyValuePair<string, object?> methodTag = new("method", method);
|
||||||
|
KeyValuePair<string, object?> categoryTag = new("category", category);
|
||||||
|
_commandsFailedCounter.Add(1, methodTag, categoryTag);
|
||||||
|
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag, categoryTag);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void EventReceived(string sessionId, string family)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_eventsReceived++;
|
||||||
|
Increment(_eventsByFamily, family);
|
||||||
|
}
|
||||||
|
|
||||||
|
_eventsReceivedCounter.Add(
|
||||||
|
1,
|
||||||
|
new KeyValuePair<string, object?>("session_id", sessionId),
|
||||||
|
new KeyValuePair<string, object?>("family", family));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RecordEventStreamSend(string family, TimeSpan duration)
|
||||||
|
{
|
||||||
|
_eventStreamSendLatencyHistogram.Record(
|
||||||
|
duration.TotalMilliseconds,
|
||||||
|
new KeyValuePair<string, object?>("family", family));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SetEventQueueDepth(int depth)
|
||||||
|
{
|
||||||
|
if (depth < 0)
|
||||||
|
{
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative.");
|
||||||
|
}
|
||||||
|
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_eventQueueDepth = depth;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void QueueOverflow(string queueName)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_queueOverflows++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_queueOverflowsCounter.Add(1, new KeyValuePair<string, object?>("queue", queueName));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Fault(string category)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_faults++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_faultsCounter.Add(1, new KeyValuePair<string, object?>("category", category));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void HeartbeatFailed(string sessionId)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_heartbeatFailures++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_heartbeatFailuresCounter.Add(1, new KeyValuePair<string, object?>("session_id", sessionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void StreamDisconnected(string reason)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_streamDisconnects++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
||||||
|
}
|
||||||
|
|
||||||
|
public GatewayMetricsSnapshot GetSnapshot()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return new GatewayMetricsSnapshot(
|
||||||
|
OpenSessions: _openSessions,
|
||||||
|
WorkersRunning: _workersRunning,
|
||||||
|
EventQueueDepth: _eventQueueDepth,
|
||||||
|
SessionsOpened: _sessionsOpened,
|
||||||
|
SessionsClosed: _sessionsClosed,
|
||||||
|
CommandsStarted: _commandsStarted,
|
||||||
|
CommandsSucceeded: _commandsSucceeded,
|
||||||
|
CommandsFailed: _commandsFailed,
|
||||||
|
EventsReceived: _eventsReceived,
|
||||||
|
QueueOverflows: _queueOverflows,
|
||||||
|
Faults: _faults,
|
||||||
|
WorkerKills: _workerKills,
|
||||||
|
WorkerExits: _workerExits,
|
||||||
|
HeartbeatFailures: _heartbeatFailures,
|
||||||
|
StreamDisconnects: _streamDisconnects,
|
||||||
|
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
|
||||||
|
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_meter.Dispose();
|
||||||
|
_disposed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int GetOpenSessions()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _openSessions;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int GetWorkersRunning()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _workersRunning;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int GetEventQueueDepth()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _eventQueueDepth;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void Increment(Dictionary<string, long> values, string key)
|
||||||
|
{
|
||||||
|
values.TryGetValue(key, out long currentValue);
|
||||||
|
values[key] = currentValue + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
namespace MxGateway.Server.Metrics;
|
||||||
|
|
||||||
|
public sealed record GatewayMetricsSnapshot(
|
||||||
|
int OpenSessions,
|
||||||
|
int WorkersRunning,
|
||||||
|
int EventQueueDepth,
|
||||||
|
long SessionsOpened,
|
||||||
|
long SessionsClosed,
|
||||||
|
long CommandsStarted,
|
||||||
|
long CommandsSucceeded,
|
||||||
|
long CommandsFailed,
|
||||||
|
long EventsReceived,
|
||||||
|
long QueueOverflows,
|
||||||
|
long Faults,
|
||||||
|
long WorkerKills,
|
||||||
|
long WorkerExits,
|
||||||
|
long HeartbeatFailures,
|
||||||
|
long StreamDisconnects,
|
||||||
|
IReadOnlyDictionary<string, long> CommandFailuresByMethod,
|
||||||
|
IReadOnlyDictionary<string, long> EventsByFamily);
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
internal static class WorkerEnvelopeValidator
|
||||||
|
{
|
||||||
|
public static void Validate(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
if (envelope.ProtocolVersion != options.ProtocolVersion)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||||
|
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.SessionMismatch,
|
||||||
|
"Worker envelope session id does not match the owning gateway session.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope must include a typed body.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public enum WorkerFrameProtocolErrorCode
|
||||||
|
{
|
||||||
|
Unknown = 0,
|
||||||
|
InvalidConfiguration = 1,
|
||||||
|
EndOfStream = 2,
|
||||||
|
MalformedLength = 3,
|
||||||
|
MessageTooLarge = 4,
|
||||||
|
InvalidEnvelope = 5,
|
||||||
|
ProtocolVersionMismatch = 6,
|
||||||
|
SessionMismatch = 7,
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolException : Exception
|
||||||
|
{
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message)
|
||||||
|
: base(message)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message,
|
||||||
|
Exception innerException)
|
||||||
|
: base(message, innerException)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolErrorCode ErrorCode { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
using MxGateway.Contracts;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolOptions
|
||||||
|
{
|
||||||
|
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(string sessionId)
|
||||||
|
: this(
|
||||||
|
sessionId,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
DefaultMaxMessageBytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(
|
||||||
|
string sessionId,
|
||||||
|
uint protocolVersion,
|
||||||
|
int maxMessageBytes)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a session id.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (protocolVersion == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a non-zero protocol version.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxMessageBytes <= 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol max message size must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionId = sessionId;
|
||||||
|
ProtocolVersion = protocolVersion;
|
||||||
|
MaxMessageBytes = maxMessageBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public string SessionId { get; }
|
||||||
|
|
||||||
|
public uint ProtocolVersion { get; }
|
||||||
|
|
||||||
|
public int MaxMessageBytes { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,77 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameReader
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameReader(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MalformedLength,
|
||||||
|
"Worker frame payload length must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] payload = new byte[payloadLength];
|
||||||
|
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
WorkerEnvelope envelope;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
|
||||||
|
}
|
||||||
|
catch (InvalidProtocolBufferException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async ValueTask ReadExactlyOrThrowAsync(
|
||||||
|
Memory<byte> buffer,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (EndOfStreamException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.EndOfStream,
|
||||||
|
"Worker frame ended before the expected number of bytes were read.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameWriter
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameWriter(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask WriteAsync(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(envelope);
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
int payloadLength = envelope.CalculateSize();
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope cannot serialize to an empty payload.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
|
||||||
|
|
||||||
|
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,73 @@
|
|||||||
|
using MxGateway.Server.Diagnostics;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Diagnostics;
|
||||||
|
|
||||||
|
public sealed class GatewayLogRedactorTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void RedactApiKey_PreservesKeyIdAndRemovesSecret()
|
||||||
|
{
|
||||||
|
string? redacted = GatewayLogRedactor.RedactApiKey("Bearer mxgw_operator01_super-secret");
|
||||||
|
|
||||||
|
Assert.Equal("Bearer mxgw_operator01_[redacted]", redacted);
|
||||||
|
Assert.DoesNotContain("super-secret", redacted);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData("AuthenticateUser")]
|
||||||
|
[InlineData("WriteSecured")]
|
||||||
|
[InlineData("WriteSecured2")]
|
||||||
|
public void IsCredentialBearingCommand_IdentifiesSensitiveMxAccessCommands(string commandMethod)
|
||||||
|
{
|
||||||
|
Assert.True(GatewayLogRedactor.IsCredentialBearingCommand(commandMethod));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RedactCommandValue_DoesNotLogRawValuesByDefault()
|
||||||
|
{
|
||||||
|
object? redacted = GatewayLogRedactor.RedactCommandValue("Write", "plaintext-tag-value");
|
||||||
|
|
||||||
|
Assert.Equal("[redacted]", redacted);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RedactCommandValue_RedactsSecuredWriteEvenWhenValueLoggingIsEnabled()
|
||||||
|
{
|
||||||
|
object? redacted = GatewayLogRedactor.RedactCommandValue(
|
||||||
|
"WriteSecured",
|
||||||
|
"credential-bearing-value",
|
||||||
|
valueLoggingEnabled: true);
|
||||||
|
|
||||||
|
Assert.Equal("[redacted]", redacted);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RedactCommandValue_AllowsNonSensitiveValueOnlyWhenValueLoggingIsEnabled()
|
||||||
|
{
|
||||||
|
object? redacted = GatewayLogRedactor.RedactCommandValue(
|
||||||
|
"Write",
|
||||||
|
"diagnostic-value",
|
||||||
|
valueLoggingEnabled: true);
|
||||||
|
|
||||||
|
Assert.Equal("diagnostic-value", redacted);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void LogScope_RedactsClientIdentityBeforeScopeStateIsCreated()
|
||||||
|
{
|
||||||
|
GatewayLogScope scope = new(
|
||||||
|
SessionId: "session-1",
|
||||||
|
WorkerProcessId: 1234,
|
||||||
|
CorrelationId: 99,
|
||||||
|
CommandMethod: "AuthenticateUser",
|
||||||
|
ClientIdentity: "Bearer mxgw_admin_secret");
|
||||||
|
|
||||||
|
IReadOnlyDictionary<string, object?> values = scope.ToDictionary();
|
||||||
|
|
||||||
|
Assert.Equal("session-1", values["SessionId"]);
|
||||||
|
Assert.Equal(1234, values["WorkerProcessId"]);
|
||||||
|
Assert.Equal((ulong)99, values["CorrelationId"]);
|
||||||
|
Assert.Equal("AuthenticateUser", values["CommandMethod"]);
|
||||||
|
Assert.Equal("Bearer mxgw_admin_[redacted]", values["ClientIdentity"]);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
using Microsoft.AspNetCore.Builder;
|
using Microsoft.AspNetCore.Builder;
|
||||||
using Microsoft.AspNetCore.Routing;
|
using Microsoft.AspNetCore.Routing;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using MxGateway.Server;
|
using MxGateway.Server;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
|
||||||
namespace MxGateway.Tests.Gateway;
|
namespace MxGateway.Tests.Gateway;
|
||||||
|
|
||||||
@@ -21,6 +23,16 @@ public sealed class GatewayApplicationTests
|
|||||||
Assert.Equal("LiveHealth", endpoint.Metadata.GetMetadata<IEndpointNameMetadata>()?.EndpointName);
|
Assert.Equal("LiveHealth", endpoint.Metadata.GetMetadata<IEndpointNameMetadata>()?.EndpointName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Build_RegistersGatewayMetrics()
|
||||||
|
{
|
||||||
|
WebApplication app = GatewayApplication.Build([]);
|
||||||
|
|
||||||
|
GatewayMetrics metrics = app.Services.GetRequiredService<GatewayMetrics>();
|
||||||
|
|
||||||
|
Assert.NotNull(metrics);
|
||||||
|
}
|
||||||
|
|
||||||
[Theory]
|
[Theory]
|
||||||
[InlineData(
|
[InlineData(
|
||||||
"MxGateway:Worker:ExecutablePath",
|
"MxGateway:Worker:ExecutablePath",
|
||||||
|
|||||||
@@ -0,0 +1,209 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolTests
|
||||||
|
{
|
||||||
|
private const string SessionId = "session-1";
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
await using MemoryStream stream = new();
|
||||||
|
WorkerEnvelope original = CreateEnvelope();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
await writer.WriteAsync(original);
|
||||||
|
stream.Position = 0;
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope original = CreateEnvelope();
|
||||||
|
byte[] frame = CreateFrame(original);
|
||||||
|
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
Assert.True(stream.ReadCallCount > 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
await using MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
|
||||||
|
await using MemoryStream stream = new(lengthPrefix);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.ProtocolVersion++;
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.SessionId = "different-session";
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
byte[] frame = CreateFrame([0x80]);
|
||||||
|
await using MemoryStream stream = new(frame);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.ClearBody();
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
|
||||||
|
await using MemoryStream stream = new();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await writer.WriteAsync(CreateEnvelope()));
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||||
|
Assert.Equal(0, stream.Length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEnvelope CreateEnvelope()
|
||||||
|
{
|
||||||
|
return new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
SessionId = SessionId,
|
||||||
|
Sequence = 1,
|
||||||
|
CorrelationId = "correlation-1",
|
||||||
|
WorkerHello = new WorkerHello
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce = "nonce",
|
||||||
|
WorkerProcessId = 1234,
|
||||||
|
WorkerVersion = "test-worker",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(IMessage message)
|
||||||
|
{
|
||||||
|
return CreateFrame(message.ToByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(byte[] payload)
|
||||||
|
{
|
||||||
|
byte[] frame = new byte[sizeof(uint) + payload.Length];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
|
||||||
|
payload.CopyTo(frame.AsSpan(sizeof(uint)));
|
||||||
|
|
||||||
|
return frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class ChunkedReadStream : MemoryStream
|
||||||
|
{
|
||||||
|
private readonly int _chunkSize;
|
||||||
|
|
||||||
|
public ChunkedReadStream(
|
||||||
|
byte[] buffer,
|
||||||
|
int chunkSize)
|
||||||
|
: base(buffer)
|
||||||
|
{
|
||||||
|
_chunkSize = chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int ReadCallCount { get; private set; }
|
||||||
|
|
||||||
|
public override ValueTask<int> ReadAsync(
|
||||||
|
Memory<byte> buffer,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ReadCallCount++;
|
||||||
|
int requestedCount = Math.Min(buffer.Length, _chunkSize);
|
||||||
|
|
||||||
|
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,60 @@
|
|||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Metrics;
|
||||||
|
|
||||||
|
public sealed class GatewayMetricsTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void GetSnapshot_ReflectsSessionWorkerCommandEventAndFaultUpdates()
|
||||||
|
{
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
|
||||||
|
metrics.SessionOpened();
|
||||||
|
metrics.WorkerStarted(TimeSpan.FromMilliseconds(250));
|
||||||
|
metrics.CommandStarted("Register");
|
||||||
|
metrics.CommandSucceeded("Register", TimeSpan.FromMilliseconds(10));
|
||||||
|
metrics.CommandStarted("WriteSecured");
|
||||||
|
metrics.CommandFailed("WriteSecured", "AuthorizationFailed", TimeSpan.FromMilliseconds(12));
|
||||||
|
metrics.EventReceived("session-1", "OnDataChange");
|
||||||
|
metrics.EventReceived("session-1", "OnDataChange");
|
||||||
|
metrics.SetEventQueueDepth(7);
|
||||||
|
metrics.QueueOverflow("session-events");
|
||||||
|
metrics.Fault("CommandTimeout");
|
||||||
|
metrics.WorkerKilled("CommandTimeout");
|
||||||
|
metrics.WorkerStopped("Killed");
|
||||||
|
metrics.HeartbeatFailed("session-1");
|
||||||
|
metrics.StreamDisconnected("ClientCancelled");
|
||||||
|
metrics.SessionClosed();
|
||||||
|
|
||||||
|
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
|
||||||
|
|
||||||
|
Assert.Equal(0, snapshot.OpenSessions);
|
||||||
|
Assert.Equal(0, snapshot.WorkersRunning);
|
||||||
|
Assert.Equal(7, snapshot.EventQueueDepth);
|
||||||
|
Assert.Equal(1, snapshot.SessionsOpened);
|
||||||
|
Assert.Equal(1, snapshot.SessionsClosed);
|
||||||
|
Assert.Equal(2, snapshot.CommandsStarted);
|
||||||
|
Assert.Equal(1, snapshot.CommandsSucceeded);
|
||||||
|
Assert.Equal(1, snapshot.CommandsFailed);
|
||||||
|
Assert.Equal(2, snapshot.EventsReceived);
|
||||||
|
Assert.Equal(1, snapshot.QueueOverflows);
|
||||||
|
Assert.Equal(1, snapshot.Faults);
|
||||||
|
Assert.Equal(1, snapshot.WorkerKills);
|
||||||
|
Assert.Equal(1, snapshot.WorkerExits);
|
||||||
|
Assert.Equal(1, snapshot.HeartbeatFailures);
|
||||||
|
Assert.Equal(1, snapshot.StreamDisconnects);
|
||||||
|
Assert.Equal(1, snapshot.CommandFailuresByMethod["WriteSecured"]);
|
||||||
|
Assert.Equal(2, snapshot.EventsByFamily["OnDataChange"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SetEventQueueDepth_RejectsNegativeDepth()
|
||||||
|
{
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
|
||||||
|
ArgumentOutOfRangeException exception = Assert.Throws<ArgumentOutOfRangeException>(
|
||||||
|
() => metrics.SetEventQueueDepth(-1));
|
||||||
|
|
||||||
|
Assert.Equal("depth", exception.ParamName);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user