Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 77eac95f33 | |||
| 015fa1f50d | |||
| dede407304 | |||
| 0d96963c99 | |||
| 3661420f0a |
@@ -206,13 +206,23 @@ accounting and a clear fan-out policy.
|
|||||||
Behavior:
|
Behavior:
|
||||||
|
|
||||||
1. Validate session id and authorize event access.
|
1. Validate session id and authorize event access.
|
||||||
2. Attach a stream cursor to the session event channel.
|
2. Attach the single active subscriber lease for the session.
|
||||||
3. Send events in worker sequence order.
|
3. Read worker events into a bounded public stream queue.
|
||||||
4. Stop on client cancellation, session close, or session fault.
|
4. Send events in worker sequence order.
|
||||||
5. Emit a terminal status when the session faults if gRPC status alone cannot
|
5. Stop on client cancellation, session close, or session fault.
|
||||||
|
6. Emit a terminal status when the session faults if gRPC status alone cannot
|
||||||
preserve the required details.
|
preserve the required details.
|
||||||
|
|
||||||
The gateway must not reorder events from one worker.
|
`EventStreamService` owns subscriber tracking and public stream backpressure.
|
||||||
|
The default policy allows one active subscriber per session. A second subscriber
|
||||||
|
is rejected with `EventSubscriberAlreadyActive`. Stream cancellation releases
|
||||||
|
the subscriber lease so a later stream can attach to the session.
|
||||||
|
|
||||||
|
The gateway must not reorder events from one worker. `EventStreamService` writes
|
||||||
|
mapped events to a bounded first-in, first-out queue and faults the session with
|
||||||
|
`EventQueueOverflow` if the queue fills. The gateway does not synthesize
|
||||||
|
`OperationComplete`; it forwards that family only when the worker reports a
|
||||||
|
native MXAccess `OperationComplete` event.
|
||||||
|
|
||||||
## Web Dashboard
|
## Web Dashboard
|
||||||
|
|
||||||
@@ -584,7 +594,8 @@ worker MXAccess event
|
|||||||
-> worker outbound event queue
|
-> worker outbound event queue
|
||||||
-> worker pipe writer
|
-> worker pipe writer
|
||||||
-> gateway read loop
|
-> gateway read loop
|
||||||
-> session event channel
|
-> worker client event queue
|
||||||
|
-> EventStreamService bounded stream queue
|
||||||
-> gRPC StreamEvents
|
-> gRPC StreamEvents
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -598,13 +609,15 @@ The gateway should record:
|
|||||||
|
|
||||||
Default backpressure policy for parity testing should be fail-fast:
|
Default backpressure policy for parity testing should be fail-fast:
|
||||||
|
|
||||||
1. If the session event channel fills, fault the session.
|
1. If the worker client event queue fills, fault the worker client.
|
||||||
|
2. If the public stream queue fills, fault the gateway session.
|
||||||
2. Preserve the overflow details in logs and metrics.
|
2. Preserve the overflow details in logs and metrics.
|
||||||
3. Do not silently drop data-change events.
|
3. Do not silently drop data-change events.
|
||||||
|
|
||||||
Do not set a production event-rate target before measurement. Emit event rate,
|
Do not set a production event-rate target before measurement. `GatewayMetrics`
|
||||||
queue depth, stream send latency, and overflow metrics. Later production modes
|
records received event counts by family, queue depth, stream disconnects, and
|
||||||
may support explicit coalescing by item handle as an opt-in behavior.
|
overflow counts. Later production modes may support explicit coalescing by item
|
||||||
|
handle as an opt-in behavior.
|
||||||
|
|
||||||
The gateway should not synthesize `OperationComplete` from write completion,
|
The gateway should not synthesize `OperationComplete` from write completion,
|
||||||
command replies, ASB completion queues, or completion-only status frames. Forward
|
command replies, ASB completion queues, or completion-only status frames. Forward
|
||||||
|
|||||||
+17
-12
@@ -107,6 +107,8 @@ worker, correlation, command, and client identity fields with redaction applied
|
|||||||
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
|
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
|
||||||
histograms through .NET `Meter` and a snapshot API that dashboard services can
|
histograms through .NET `Meter` and a snapshot API that dashboard services can
|
||||||
project without binding to a metrics exporter.
|
project without binding to a metrics exporter.
|
||||||
|
`DashboardSnapshotService` projects sessions, workers, metrics, faults, and
|
||||||
|
effective configuration into immutable DTOs for read-only dashboard rendering.
|
||||||
|
|
||||||
### Worker Process
|
### Worker Process
|
||||||
|
|
||||||
@@ -518,11 +520,7 @@ Worker policy:
|
|||||||
|
|
||||||
- bounded outbound event channel,
|
- bounded outbound event channel,
|
||||||
- never block MXAccess event handler on pipe writes,
|
- never block MXAccess event handler on pipe writes,
|
||||||
- if the outbound channel is full, apply configured policy:
|
- fail the worker session when the outbound channel is full.
|
||||||
- disconnect session,
|
|
||||||
- drop oldest low-priority data-change events,
|
|
||||||
- coalesce data changes by item handle,
|
|
||||||
- or block briefly then fault.
|
|
||||||
|
|
||||||
For full parity testing, default should be fail-fast rather than silent drop.
|
For full parity testing, default should be fail-fast rather than silent drop.
|
||||||
For production high-rate telemetry, add explicit coalescing modes.
|
For production high-rate telemetry, add explicit coalescing modes.
|
||||||
@@ -531,9 +529,15 @@ Gateway policy:
|
|||||||
|
|
||||||
- one event sequencer per session,
|
- one event sequencer per session,
|
||||||
- preserve per-session event order,
|
- preserve per-session event order,
|
||||||
- support multiple client event subscribers only if explicitly required,
|
- allow one active client event subscriber per session,
|
||||||
- apply backpressure from slow gRPC streams,
|
- reject a second subscriber with a clear session error,
|
||||||
- disconnect or coalesce according to client-selected mode.
|
- use a bounded `EventStreamService` queue between worker events and gRPC
|
||||||
|
writes,
|
||||||
|
- fault the session when the bounded stream queue overflows,
|
||||||
|
- detach the subscriber when the stream is canceled.
|
||||||
|
|
||||||
|
The gateway forwards only events reported by the worker. It does not synthesize
|
||||||
|
`OperationComplete` from write completion, command replies, or status frames.
|
||||||
|
|
||||||
## Isolation And Fault Handling
|
## Isolation And Fault Handling
|
||||||
|
|
||||||
@@ -855,10 +859,11 @@ translation code testable.
|
|||||||
The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service
|
The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service
|
||||||
implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by
|
implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by
|
||||||
validating public requests, delegating session work to `ISessionManager`, and
|
validating public requests, delegating session work to `ISessionManager`, and
|
||||||
using explicit mapper code for public-to-worker commands, worker replies, and
|
using explicit mapper code for public-to-worker commands and worker replies.
|
||||||
events. Missing sessions and transport failures return gRPC status errors;
|
`StreamEvents` delegates subscriber ownership, ordering, and backpressure to
|
||||||
worker command replies preserve MXAccess HRESULT and status details in the
|
`EventStreamService`. Missing sessions and transport failures return gRPC
|
||||||
public reply.
|
status errors; worker command replies preserve MXAccess HRESULT and status
|
||||||
|
details in the public reply.
|
||||||
|
|
||||||
## C# Worker Versus C++ Worker
|
## C# Worker Versus C++ Worker
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,9 @@
|
|||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public sealed record DashboardFaultSummary(
|
||||||
|
string Source,
|
||||||
|
string? SessionId,
|
||||||
|
int? WorkerProcessId,
|
||||||
|
string State,
|
||||||
|
string Message,
|
||||||
|
DateTimeOffset ObservedAt);
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public sealed record DashboardMetricSummary(
|
||||||
|
string Name,
|
||||||
|
long Value,
|
||||||
|
string? Dimension = null);
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
using MxGateway.Server.Diagnostics;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
internal static class DashboardRedactor
|
||||||
|
{
|
||||||
|
private static readonly string[] SensitiveTextMarkers =
|
||||||
|
[
|
||||||
|
"apikey",
|
||||||
|
"api_key",
|
||||||
|
"authorization",
|
||||||
|
"credential",
|
||||||
|
"password",
|
||||||
|
"secret",
|
||||||
|
"token",
|
||||||
|
];
|
||||||
|
|
||||||
|
public static string? Redact(string? value)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(value))
|
||||||
|
{
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value.Contains("mxgw_", StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
return GatewayLogRedactor.RedactClientIdentity(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return SensitiveTextMarkers.Any(marker => value.Contains(marker, StringComparison.OrdinalIgnoreCase))
|
||||||
|
? GatewayLogRedactor.RedactedValue
|
||||||
|
: value;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public static class DashboardServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
public static IServiceCollection AddGatewayDashboard(this IServiceCollection services)
|
||||||
|
{
|
||||||
|
services.AddSingleton<IDashboardSnapshotService, DashboardSnapshotService>();
|
||||||
|
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public sealed record DashboardSessionSummary(
|
||||||
|
string SessionId,
|
||||||
|
string BackendName,
|
||||||
|
SessionState State,
|
||||||
|
string? ClientIdentity,
|
||||||
|
string? ClientSessionName,
|
||||||
|
string? ClientCorrelationId,
|
||||||
|
DateTimeOffset OpenedAt,
|
||||||
|
DateTimeOffset LastClientActivityAt,
|
||||||
|
DateTimeOffset? LeaseExpiresAt,
|
||||||
|
int? WorkerProcessId,
|
||||||
|
WorkerClientState? WorkerState,
|
||||||
|
DateTimeOffset? LastWorkerHeartbeatAt,
|
||||||
|
string? LastFault);
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public sealed record DashboardSnapshot(
|
||||||
|
DateTimeOffset GeneratedAt,
|
||||||
|
DateTimeOffset GatewayStartedAt,
|
||||||
|
TimeSpan GatewayUptime,
|
||||||
|
string GatewayStatus,
|
||||||
|
string GatewayVersion,
|
||||||
|
IReadOnlyList<DashboardSessionSummary> Sessions,
|
||||||
|
IReadOnlyList<DashboardWorkerSummary> Workers,
|
||||||
|
IReadOnlyList<DashboardMetricSummary> Metrics,
|
||||||
|
IReadOnlyList<DashboardFaultSummary> Faults,
|
||||||
|
EffectiveGatewayConfiguration Configuration);
|
||||||
@@ -0,0 +1,196 @@
|
|||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public sealed class DashboardSnapshotService : IDashboardSnapshotService
|
||||||
|
{
|
||||||
|
private const string HealthyStatus = "Healthy";
|
||||||
|
|
||||||
|
private readonly ISessionRegistry _sessionRegistry;
|
||||||
|
private readonly GatewayMetrics _metrics;
|
||||||
|
private readonly IGatewayConfigurationProvider _configurationProvider;
|
||||||
|
private readonly TimeProvider _timeProvider;
|
||||||
|
private readonly DateTimeOffset _gatewayStartedAt;
|
||||||
|
private readonly TimeSpan _snapshotInterval;
|
||||||
|
private readonly int _recentFaultLimit;
|
||||||
|
private readonly int _recentSessionLimit;
|
||||||
|
|
||||||
|
public DashboardSnapshotService(
|
||||||
|
ISessionRegistry sessionRegistry,
|
||||||
|
GatewayMetrics metrics,
|
||||||
|
IGatewayConfigurationProvider configurationProvider,
|
||||||
|
IOptions<GatewayOptions> options,
|
||||||
|
TimeProvider? timeProvider = null)
|
||||||
|
{
|
||||||
|
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
|
||||||
|
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
||||||
|
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
|
||||||
|
ArgumentNullException.ThrowIfNull(options);
|
||||||
|
|
||||||
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||||
|
_gatewayStartedAt = _timeProvider.GetUtcNow();
|
||||||
|
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
|
||||||
|
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
|
||||||
|
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DashboardSnapshot GetSnapshot()
|
||||||
|
{
|
||||||
|
DateTimeOffset generatedAt = _timeProvider.GetUtcNow();
|
||||||
|
IReadOnlyList<GatewaySession> sessions = _sessionRegistry.Snapshot()
|
||||||
|
.OrderByDescending(session => session.OpenedAt)
|
||||||
|
.ToArray();
|
||||||
|
IReadOnlyList<DashboardSessionSummary> sessionSummaries = sessions
|
||||||
|
.Take(ResolveLimit(_recentSessionLimit))
|
||||||
|
.Select(CreateSessionSummary)
|
||||||
|
.ToArray();
|
||||||
|
IReadOnlyList<DashboardWorkerSummary> workerSummaries = sessions
|
||||||
|
.Where(session => session.WorkerClient is not null)
|
||||||
|
.Select(CreateWorkerSummary)
|
||||||
|
.ToArray();
|
||||||
|
GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot();
|
||||||
|
|
||||||
|
return new DashboardSnapshot(
|
||||||
|
GeneratedAt: generatedAt,
|
||||||
|
GatewayStartedAt: _gatewayStartedAt,
|
||||||
|
GatewayUptime: generatedAt - _gatewayStartedAt,
|
||||||
|
GatewayStatus: HealthyStatus,
|
||||||
|
GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown",
|
||||||
|
Sessions: sessionSummaries,
|
||||||
|
Workers: workerSummaries,
|
||||||
|
Metrics: CreateMetricSummaries(metricsSnapshot),
|
||||||
|
Faults: CreateFaultSummaries(sessions, generatedAt),
|
||||||
|
Configuration: _configurationProvider.GetEffectiveConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
yield return GetSnapshot();
|
||||||
|
|
||||||
|
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
bool hasNext;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasNext)
|
||||||
|
{
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
yield return GetSnapshot();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DashboardSessionSummary CreateSessionSummary(GatewaySession session)
|
||||||
|
{
|
||||||
|
IWorkerClient? workerClient = session.WorkerClient;
|
||||||
|
|
||||||
|
return new DashboardSessionSummary(
|
||||||
|
SessionId: session.SessionId,
|
||||||
|
BackendName: session.BackendName,
|
||||||
|
State: session.State,
|
||||||
|
ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity),
|
||||||
|
ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName),
|
||||||
|
ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId),
|
||||||
|
OpenedAt: session.OpenedAt,
|
||||||
|
LastClientActivityAt: session.LastClientActivityAt,
|
||||||
|
LeaseExpiresAt: session.LeaseExpiresAt,
|
||||||
|
WorkerProcessId: workerClient?.ProcessId,
|
||||||
|
WorkerState: workerClient?.State,
|
||||||
|
LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt,
|
||||||
|
LastFault: DashboardRedactor.Redact(session.FinalFault));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session)
|
||||||
|
{
|
||||||
|
IWorkerClient workerClient = session.WorkerClient!;
|
||||||
|
|
||||||
|
return new DashboardWorkerSummary(
|
||||||
|
SessionId: session.SessionId,
|
||||||
|
ProcessId: workerClient.ProcessId,
|
||||||
|
State: workerClient.State,
|
||||||
|
LastHeartbeatAt: workerClient.LastHeartbeatAt,
|
||||||
|
LastFault: DashboardRedactor.Redact(session.FinalFault));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IReadOnlyList<DashboardMetricSummary> CreateMetricSummaries(GatewayMetricsSnapshot snapshot)
|
||||||
|
{
|
||||||
|
List<DashboardMetricSummary> metrics =
|
||||||
|
[
|
||||||
|
new("mxgateway.sessions.open", snapshot.OpenSessions),
|
||||||
|
new("mxgateway.workers.running", snapshot.WorkersRunning),
|
||||||
|
new("mxgateway.events.queue.depth", snapshot.EventQueueDepth),
|
||||||
|
new("mxgateway.sessions.opened", snapshot.SessionsOpened),
|
||||||
|
new("mxgateway.sessions.closed", snapshot.SessionsClosed),
|
||||||
|
new("mxgateway.commands.started", snapshot.CommandsStarted),
|
||||||
|
new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded),
|
||||||
|
new("mxgateway.commands.failed", snapshot.CommandsFailed),
|
||||||
|
new("mxgateway.events.received", snapshot.EventsReceived),
|
||||||
|
new("mxgateway.queues.overflows", snapshot.QueueOverflows),
|
||||||
|
new("mxgateway.faults", snapshot.Faults),
|
||||||
|
new("mxgateway.workers.killed", snapshot.WorkerKills),
|
||||||
|
new("mxgateway.workers.exited", snapshot.WorkerExits),
|
||||||
|
new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures),
|
||||||
|
new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects),
|
||||||
|
];
|
||||||
|
|
||||||
|
metrics.AddRange(snapshot.CommandFailuresByMethod
|
||||||
|
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
|
||||||
|
.Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key)));
|
||||||
|
metrics.AddRange(snapshot.EventsByFamily
|
||||||
|
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
|
||||||
|
.Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key)));
|
||||||
|
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private IReadOnlyList<DashboardFaultSummary> CreateFaultSummaries(
|
||||||
|
IReadOnlyList<GatewaySession> sessions,
|
||||||
|
DateTimeOffset generatedAt)
|
||||||
|
{
|
||||||
|
return sessions
|
||||||
|
.Where(HasFault)
|
||||||
|
.Take(ResolveLimit(_recentFaultLimit))
|
||||||
|
.Select(session => new DashboardFaultSummary(
|
||||||
|
Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session",
|
||||||
|
SessionId: session.SessionId,
|
||||||
|
WorkerProcessId: session.WorkerProcessId,
|
||||||
|
State: session.WorkerClient?.State == WorkerClientState.Faulted
|
||||||
|
? WorkerClientState.Faulted.ToString()
|
||||||
|
: session.State.ToString(),
|
||||||
|
Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted",
|
||||||
|
ObservedAt: generatedAt))
|
||||||
|
.ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static bool HasFault(GatewaySession session)
|
||||||
|
{
|
||||||
|
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
|
||||||
|
|| session.WorkerClient?.State == WorkerClientState.Faulted
|
||||||
|
|| !string.IsNullOrWhiteSpace(session.FinalFault);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int ResolveLimit(int configuredLimit)
|
||||||
|
{
|
||||||
|
return configuredLimit < 0 ? 0 : configuredLimit;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public sealed record DashboardWorkerSummary(
|
||||||
|
string SessionId,
|
||||||
|
int? ProcessId,
|
||||||
|
WorkerClientState State,
|
||||||
|
DateTimeOffset LastHeartbeatAt,
|
||||||
|
string? LastFault);
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
namespace MxGateway.Server.Dashboard;
|
||||||
|
|
||||||
|
public interface IDashboardSnapshotService
|
||||||
|
{
|
||||||
|
DashboardSnapshot GetSnapshot();
|
||||||
|
|
||||||
|
IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
using MxGateway.Contracts;
|
using MxGateway.Contracts;
|
||||||
using MxGateway.Server.Configuration;
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Dashboard;
|
||||||
using MxGateway.Server.Diagnostics;
|
using MxGateway.Server.Diagnostics;
|
||||||
using MxGateway.Server.Grpc;
|
using MxGateway.Server.Grpc;
|
||||||
using MxGateway.Server.Metrics;
|
using MxGateway.Server.Metrics;
|
||||||
@@ -34,8 +35,10 @@ public static class GatewayApplication
|
|||||||
builder.Services.AddSingleton<GatewayMetrics>();
|
builder.Services.AddSingleton<GatewayMetrics>();
|
||||||
builder.Services.AddSingleton<MxAccessGrpcMapper>();
|
builder.Services.AddSingleton<MxAccessGrpcMapper>();
|
||||||
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
|
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
|
||||||
|
builder.Services.AddSingleton<IEventStreamService, EventStreamService>();
|
||||||
builder.Services.AddWorkerProcessLauncher();
|
builder.Services.AddWorkerProcessLauncher();
|
||||||
builder.Services.AddGatewaySessions();
|
builder.Services.AddGatewaySessions();
|
||||||
|
builder.Services.AddGatewayDashboard();
|
||||||
|
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,140 @@
|
|||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Threading.Channels;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Grpc;
|
||||||
|
|
||||||
|
public sealed class EventStreamService(
|
||||||
|
ISessionManager sessionManager,
|
||||||
|
IOptions<GatewayOptions> options,
|
||||||
|
MxAccessGrpcMapper mapper,
|
||||||
|
GatewayMetrics metrics,
|
||||||
|
ILogger<EventStreamService> logger) : IEventStreamService
|
||||||
|
{
|
||||||
|
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||||
|
StreamEventsRequest request,
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession session))
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.SessionNotFound,
|
||||||
|
$"Session {request.SessionId} was not found.");
|
||||||
|
}
|
||||||
|
|
||||||
|
using IDisposable subscriber = session.AttachEventSubscriber(
|
||||||
|
options.Value.Sessions.AllowMultipleEventSubscribers);
|
||||||
|
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||||
|
|
||||||
|
int streamQueueDepth = 0;
|
||||||
|
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
|
||||||
|
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
|
||||||
|
{
|
||||||
|
SingleReader = true,
|
||||||
|
SingleWriter = true,
|
||||||
|
FullMode = BoundedChannelFullMode.Wait,
|
||||||
|
AllowSynchronousContinuations = false,
|
||||||
|
});
|
||||||
|
Task producerTask = ProduceEventsAsync(
|
||||||
|
session,
|
||||||
|
request.AfterWorkerSequence,
|
||||||
|
eventQueue.Writer,
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
int depth = Interlocked.Increment(ref streamQueueDepth);
|
||||||
|
metrics.SetEventQueueDepth(depth);
|
||||||
|
},
|
||||||
|
streamCts.Token);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth));
|
||||||
|
metrics.SetEventQueueDepth(depth);
|
||||||
|
yield return mxEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
await producerTask.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await streamCts.CancelAsync().ConfigureAwait(false);
|
||||||
|
subscriber.Dispose();
|
||||||
|
metrics.StreamDisconnected("Detached");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await producerTask.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
logger.LogDebug(
|
||||||
|
exception,
|
||||||
|
"Event stream producer stopped for session {SessionId}.",
|
||||||
|
request.SessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ProduceEventsAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
ulong afterWorkerSequence,
|
||||||
|
ChannelWriter<MxEvent> writer,
|
||||||
|
Action eventQueued,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await foreach (WorkerEvent workerEvent in session
|
||||||
|
.ReadEventsAsync(cancellationToken)
|
||||||
|
.WithCancellation(cancellationToken)
|
||||||
|
.ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
MxEvent publicEvent = mapper.MapEvent(workerEvent);
|
||||||
|
if (publicEvent.WorkerSequence <= afterWorkerSequence)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!writer.TryWrite(publicEvent))
|
||||||
|
{
|
||||||
|
string message = $"Session {session.SessionId} event stream queue overflowed.";
|
||||||
|
session.MarkFaulted(message);
|
||||||
|
metrics.QueueOverflow("grpc-event-stream");
|
||||||
|
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
|
||||||
|
writer.TryComplete(new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.EventQueueOverflow,
|
||||||
|
message));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
eventQueued();
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.TryComplete();
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
writer.TryComplete();
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
if (exception is WorkerClientException)
|
||||||
|
{
|
||||||
|
session.MarkFaulted(exception.Message);
|
||||||
|
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.TryComplete(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Grpc;
|
||||||
|
|
||||||
|
public interface IEventStreamService
|
||||||
|
{
|
||||||
|
IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||||
|
StreamEventsRequest request,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@ public sealed class MxAccessGatewayService(
|
|||||||
IGatewayRequestIdentityAccessor identityAccessor,
|
IGatewayRequestIdentityAccessor identityAccessor,
|
||||||
MxAccessGrpcRequestValidator requestValidator,
|
MxAccessGrpcRequestValidator requestValidator,
|
||||||
MxAccessGrpcMapper mapper,
|
MxAccessGrpcMapper mapper,
|
||||||
|
IEventStreamService eventStreamService,
|
||||||
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
|
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
|
||||||
{
|
{
|
||||||
public override async Task<OpenSessionReply> OpenSession(
|
public override async Task<OpenSessionReply> OpenSession(
|
||||||
@@ -102,17 +103,11 @@ public sealed class MxAccessGatewayService(
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
requestValidator.ValidateStreamEvents(request);
|
requestValidator.ValidateStreamEvents(request);
|
||||||
await foreach (WorkerEvent workerEvent in sessionManager
|
await foreach (MxEvent publicEvent in eventStreamService
|
||||||
.ReadEventsAsync(request.SessionId, context.CancellationToken)
|
.StreamEventsAsync(request, context.CancellationToken)
|
||||||
.WithCancellation(context.CancellationToken)
|
.WithCancellation(context.CancellationToken)
|
||||||
.ConfigureAwait(false))
|
.ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
MxEvent publicEvent = mapper.MapEvent(workerEvent);
|
|
||||||
if (publicEvent.WorkerSequence <= request.AfterWorkerSequence)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
await responseStream.WriteAsync(publicEvent).ConfigureAwait(false);
|
await responseStream.WriteAsync(publicEvent).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -154,6 +149,8 @@ public sealed class MxAccessGatewayService(
|
|||||||
{
|
{
|
||||||
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
|
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
|
||||||
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
|
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
|
||||||
|
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
|
||||||
|
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
|
||||||
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
|
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
|
||||||
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
|
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
|
||||||
SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
|
SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ public sealed class GatewaySession
|
|||||||
private DateTimeOffset _lastClientActivityAt;
|
private DateTimeOffset _lastClientActivityAt;
|
||||||
private DateTimeOffset? _leaseExpiresAt;
|
private DateTimeOffset? _leaseExpiresAt;
|
||||||
private bool _closeStarted;
|
private bool _closeStarted;
|
||||||
|
private int _activeEventSubscriberCount;
|
||||||
|
|
||||||
public GatewaySession(
|
public GatewaySession(
|
||||||
string sessionId,
|
string sessionId,
|
||||||
@@ -131,6 +132,17 @@ public sealed class GatewaySession
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int ActiveEventSubscriberCount
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _activeEventSubscriberCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void AttachWorkerClient(IWorkerClient workerClient)
|
public void AttachWorkerClient(IWorkerClient workerClient)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(workerClient);
|
ArgumentNullException.ThrowIfNull(workerClient);
|
||||||
@@ -202,6 +214,29 @@ public sealed class GatewaySession
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.SessionNotReady,
|
||||||
|
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.EventSubscriberAlreadyActive,
|
||||||
|
$"Session {SessionId} already has an active event stream subscriber.");
|
||||||
|
}
|
||||||
|
|
||||||
|
_activeEventSubscriberCount++;
|
||||||
|
return new EventSubscriberLease(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<WorkerCommandReply> InvokeAsync(
|
public async Task<WorkerCommandReply> InvokeAsync(
|
||||||
WorkerCommand command,
|
WorkerCommand command,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
@@ -287,4 +322,31 @@ public sealed class GatewaySession
|
|||||||
return _workerClient;
|
return _workerClient;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void DetachEventSubscriber()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_activeEventSubscriberCount > 0)
|
||||||
|
{
|
||||||
|
_activeEventSubscriberCount--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
|
||||||
|
{
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
session.DetachEventSubscriber();
|
||||||
|
_disposed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ public enum SessionManagerErrorCode
|
|||||||
{
|
{
|
||||||
SessionNotFound,
|
SessionNotFound,
|
||||||
SessionNotReady,
|
SessionNotReady,
|
||||||
|
EventSubscriberAlreadyActive,
|
||||||
|
EventQueueOverflow,
|
||||||
SessionLimitExceeded,
|
SessionLimitExceeded,
|
||||||
OpenFailed,
|
OpenFailed,
|
||||||
CloseFailed,
|
CloseFailed,
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ public sealed class WorkerClient : IWorkerClient
|
|||||||
private WorkerClientState _state;
|
private WorkerClientState _state;
|
||||||
private DateTimeOffset _lastHeartbeatAt;
|
private DateTimeOffset _lastHeartbeatAt;
|
||||||
private int? _processId;
|
private int? _processId;
|
||||||
|
private int _eventQueueDepth;
|
||||||
private Task? _readLoopTask;
|
private Task? _readLoopTask;
|
||||||
private Task? _writeLoopTask;
|
private Task? _writeLoopTask;
|
||||||
private Task? _heartbeatLoopTask;
|
private Task? _heartbeatLoopTask;
|
||||||
@@ -197,6 +198,8 @@ public sealed class WorkerClient : IWorkerClient
|
|||||||
{
|
{
|
||||||
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
|
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
|
int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth));
|
||||||
|
_metrics?.SetEventQueueDepth(queueDepth);
|
||||||
yield return workerEvent;
|
yield return workerEvent;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -394,11 +397,6 @@ public sealed class WorkerClient : IWorkerClient
|
|||||||
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
|
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!await _events.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_events.Writer.TryWrite(workerEvent))
|
if (!_events.Writer.TryWrite(workerEvent))
|
||||||
{
|
{
|
||||||
_metrics?.QueueOverflow("worker-events");
|
_metrics?.QueueOverflow("worker-events");
|
||||||
@@ -406,7 +404,11 @@ public sealed class WorkerClient : IWorkerClient
|
|||||||
WorkerClientErrorCode.ProtocolViolation,
|
WorkerClientErrorCode.ProtocolViolation,
|
||||||
"Worker event channel rejected an event.",
|
"Worker event channel rejected an event.",
|
||||||
null);
|
null);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
|
||||||
|
_metrics?.SetEventQueueDepth(queueDepth);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void CompleteCommand(WorkerEnvelope envelope)
|
private void CompleteCommand(WorkerEnvelope envelope)
|
||||||
|
|||||||
@@ -0,0 +1,290 @@
|
|||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Dashboard;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Dashboard;
|
||||||
|
|
||||||
|
public sealed class DashboardSnapshotServiceTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void GetSnapshot_WhenRegistryEmpty_ReturnsEmptyOperationalState()
|
||||||
|
{
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
DashboardSnapshotService service = CreateService(new SessionRegistry(), metrics);
|
||||||
|
|
||||||
|
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||||
|
|
||||||
|
Assert.Empty(snapshot.Sessions);
|
||||||
|
Assert.Empty(snapshot.Workers);
|
||||||
|
Assert.Empty(snapshot.Faults);
|
||||||
|
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.sessions.open" && metric.Value == 0);
|
||||||
|
Assert.Equal("Healthy", snapshot.GatewayStatus);
|
||||||
|
Assert.NotNull(snapshot.Configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetSnapshot_ProjectsActiveAndFaultedSessionsWorkersMetricsAndFaults()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new();
|
||||||
|
GatewaySession activeSession = CreateSession(
|
||||||
|
"session-active",
|
||||||
|
"client-one",
|
||||||
|
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
|
||||||
|
activeSession.AttachWorkerClient(new FakeWorkerClient("session-active", 1201, WorkerClientState.Ready));
|
||||||
|
activeSession.MarkReady();
|
||||||
|
GatewaySession faultedSession = CreateSession(
|
||||||
|
"session-faulted",
|
||||||
|
"client-two",
|
||||||
|
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
|
||||||
|
faultedSession.AttachWorkerClient(new FakeWorkerClient("session-faulted", 1202, WorkerClientState.Faulted));
|
||||||
|
faultedSession.MarkFaulted("worker pipe disconnected");
|
||||||
|
registry.TryAdd(activeSession);
|
||||||
|
registry.TryAdd(faultedSession);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
metrics.SessionOpened();
|
||||||
|
metrics.SessionOpened();
|
||||||
|
metrics.CommandStarted("Register");
|
||||||
|
metrics.CommandFailed("Register", "WorkerFaulted", TimeSpan.FromMilliseconds(7));
|
||||||
|
metrics.EventReceived("session-active", "OnDataChange");
|
||||||
|
metrics.Fault("WorkerFaulted");
|
||||||
|
DashboardSnapshotService service = CreateService(registry, metrics);
|
||||||
|
|
||||||
|
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||||
|
|
||||||
|
Assert.Equal(2, snapshot.Sessions.Count);
|
||||||
|
Assert.Equal("session-faulted", snapshot.Sessions[0].SessionId);
|
||||||
|
Assert.Equal(SessionState.Faulted, snapshot.Sessions[0].State);
|
||||||
|
Assert.Equal(2, snapshot.Workers.Count);
|
||||||
|
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.commands.started" && metric.Value == 1);
|
||||||
|
Assert.Contains(
|
||||||
|
snapshot.Metrics,
|
||||||
|
metric => metric.Name == "mxgateway.events.received"
|
||||||
|
&& metric.Dimension == "OnDataChange"
|
||||||
|
&& metric.Value == 1);
|
||||||
|
DashboardFaultSummary fault = Assert.Single(snapshot.Faults);
|
||||||
|
Assert.Equal("Worker", fault.Source);
|
||||||
|
Assert.Equal("session-faulted", fault.SessionId);
|
||||||
|
Assert.Equal("worker pipe disconnected", fault.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetSnapshot_RedactsSecretsFromSessionAndFaultFields()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new();
|
||||||
|
GatewaySession session = CreateSession(
|
||||||
|
"session-redacted",
|
||||||
|
"Bearer mxgw_admin_super-secret",
|
||||||
|
DateTimeOffset.Parse("2026-04-26T10:00:00Z"),
|
||||||
|
clientSessionName: "password=hunter2",
|
||||||
|
clientCorrelationId: "token=abc123");
|
||||||
|
session.MarkFaulted("secret=credential-value");
|
||||||
|
registry.TryAdd(session);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
DashboardSnapshotService service = CreateService(registry, metrics);
|
||||||
|
|
||||||
|
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||||
|
|
||||||
|
DashboardSessionSummary summary = Assert.Single(snapshot.Sessions);
|
||||||
|
Assert.Equal("Bearer mxgw_admin_[redacted]", summary.ClientIdentity);
|
||||||
|
Assert.Equal("[redacted]", summary.ClientSessionName);
|
||||||
|
Assert.Equal("[redacted]", summary.ClientCorrelationId);
|
||||||
|
Assert.Equal("[redacted]", summary.LastFault);
|
||||||
|
Assert.Equal("[redacted]", Assert.Single(snapshot.Faults).Message);
|
||||||
|
Assert.Equal("[redacted]", snapshot.Configuration.Authentication.PepperSecretName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetSnapshot_DoesNotMutateSessionOrWorkerState()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new();
|
||||||
|
GatewaySession session = CreateSession(
|
||||||
|
"session-active",
|
||||||
|
"client-one",
|
||||||
|
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
|
||||||
|
FakeWorkerClient workerClient = new("session-active", 1201, WorkerClientState.Ready);
|
||||||
|
session.AttachWorkerClient(workerClient);
|
||||||
|
session.MarkReady();
|
||||||
|
registry.TryAdd(session);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
DashboardSnapshotService service = CreateService(registry, metrics);
|
||||||
|
|
||||||
|
service.GetSnapshot();
|
||||||
|
service.GetSnapshot();
|
||||||
|
|
||||||
|
Assert.Equal(1, registry.ActiveCount);
|
||||||
|
Assert.Equal(SessionState.Ready, session.State);
|
||||||
|
Assert.Equal(WorkerClientState.Ready, workerClient.State);
|
||||||
|
Assert.Equal(0, workerClient.StartCount);
|
||||||
|
Assert.Equal(0, workerClient.ShutdownCount);
|
||||||
|
Assert.Equal(0, workerClient.KillCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetSnapshot_AppliesRecentSessionAndFaultLimits()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new();
|
||||||
|
GatewaySession olderSession = CreateSession(
|
||||||
|
"session-older",
|
||||||
|
"client-one",
|
||||||
|
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
|
||||||
|
GatewaySession newerSession = CreateSession(
|
||||||
|
"session-newer",
|
||||||
|
"client-two",
|
||||||
|
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
|
||||||
|
olderSession.MarkFaulted("older fault");
|
||||||
|
newerSession.MarkFaulted("newer fault");
|
||||||
|
registry.TryAdd(olderSession);
|
||||||
|
registry.TryAdd(newerSession);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
DashboardSnapshotService service = CreateService(
|
||||||
|
registry,
|
||||||
|
metrics,
|
||||||
|
new GatewayOptions
|
||||||
|
{
|
||||||
|
Dashboard = new DashboardOptions
|
||||||
|
{
|
||||||
|
SnapshotIntervalMilliseconds = 1,
|
||||||
|
RecentSessionLimit = 1,
|
||||||
|
RecentFaultLimit = 1,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||||
|
|
||||||
|
Assert.Equal("session-newer", Assert.Single(snapshot.Sessions).SessionId);
|
||||||
|
Assert.Equal("session-newer", Assert.Single(snapshot.Faults).SessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WatchSnapshotsAsync_WhenSubscriberCancels_DisposesCleanly()
|
||||||
|
{
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
DashboardSnapshotService service = CreateService(
|
||||||
|
new SessionRegistry(),
|
||||||
|
metrics,
|
||||||
|
new GatewayOptions
|
||||||
|
{
|
||||||
|
Dashboard = new DashboardOptions
|
||||||
|
{
|
||||||
|
SnapshotIntervalMilliseconds = 1,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
using CancellationTokenSource cancellation = new();
|
||||||
|
await using IAsyncEnumerator<DashboardSnapshot> enumerator = service
|
||||||
|
.WatchSnapshotsAsync(cancellation.Token)
|
||||||
|
.GetAsyncEnumerator();
|
||||||
|
|
||||||
|
Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1)));
|
||||||
|
await cancellation.CancelAsync();
|
||||||
|
bool hasNext = await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1));
|
||||||
|
|
||||||
|
Assert.False(hasNext);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DashboardSnapshotService CreateService(
|
||||||
|
SessionRegistry registry,
|
||||||
|
GatewayMetrics metrics,
|
||||||
|
GatewayOptions? options = null)
|
||||||
|
{
|
||||||
|
GatewayOptions resolvedOptions = options ?? new GatewayOptions
|
||||||
|
{
|
||||||
|
Dashboard = new DashboardOptions
|
||||||
|
{
|
||||||
|
SnapshotIntervalMilliseconds = 1,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
GatewayConfigurationProvider configurationProvider = new(Options.Create(resolvedOptions));
|
||||||
|
|
||||||
|
return new DashboardSnapshotService(
|
||||||
|
registry,
|
||||||
|
metrics,
|
||||||
|
configurationProvider,
|
||||||
|
Options.Create(resolvedOptions));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GatewaySession CreateSession(
|
||||||
|
string sessionId,
|
||||||
|
string? clientIdentity,
|
||||||
|
DateTimeOffset openedAt,
|
||||||
|
string? clientSessionName = "test-session",
|
||||||
|
string? clientCorrelationId = "client-correlation")
|
||||||
|
{
|
||||||
|
return new GatewaySession(
|
||||||
|
sessionId,
|
||||||
|
"mxaccess",
|
||||||
|
$"mxaccess-gateway-1-{sessionId}",
|
||||||
|
"nonce",
|
||||||
|
clientIdentity,
|
||||||
|
clientSessionName,
|
||||||
|
clientCorrelationId,
|
||||||
|
TimeSpan.FromSeconds(30),
|
||||||
|
TimeSpan.FromSeconds(5),
|
||||||
|
TimeSpan.FromSeconds(5),
|
||||||
|
openedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeWorkerClient(
|
||||||
|
string sessionId,
|
||||||
|
int? processId,
|
||||||
|
WorkerClientState state) : IWorkerClient
|
||||||
|
{
|
||||||
|
public string SessionId { get; } = sessionId;
|
||||||
|
|
||||||
|
public int? ProcessId { get; } = processId;
|
||||||
|
|
||||||
|
public WorkerClientState State { get; private set; } = state;
|
||||||
|
|
||||||
|
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.Parse("2026-04-26T10:02:00Z");
|
||||||
|
|
||||||
|
public int StartCount { get; private set; }
|
||||||
|
|
||||||
|
public int ShutdownCount { get; private set; }
|
||||||
|
|
||||||
|
public int KillCount { get; private set; }
|
||||||
|
|
||||||
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
StartCount++;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
WorkerCommand command,
|
||||||
|
TimeSpan timeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(new WorkerCommandReply());
|
||||||
|
}
|
||||||
|
|
||||||
|
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await Task.CompletedTask;
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ShutdownAsync(
|
||||||
|
TimeSpan timeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ShutdownCount++;
|
||||||
|
State = WorkerClientState.Closed;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Kill(string reason)
|
||||||
|
{
|
||||||
|
KillCount++;
|
||||||
|
State = WorkerClientState.Faulted;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,383 @@
|
|||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using Grpc.Core;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Grpc;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Grpc;
|
||||||
|
|
||||||
|
public sealed class EventStreamServiceTests
|
||||||
|
{
|
||||||
|
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StreamEventsAsync_YieldsEventsInWorkerOrder()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
GatewaySession session = CreateReadySession(workerClient);
|
||||||
|
FakeSessionManager sessionManager = new(session);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
EventStreamService service = CreateService(sessionManager, metrics: metrics);
|
||||||
|
workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnDataChange));
|
||||||
|
workerClient.Events.Add(CreateWorkerEvent(sequence: 11, MxEventFamily.OnWriteComplete));
|
||||||
|
workerClient.CompleteAfterConfiguredEvents = true;
|
||||||
|
|
||||||
|
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
|
||||||
|
|
||||||
|
Assert.Equal([10UL, 11UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
|
||||||
|
Assert.Equal(MxEventFamily.OnDataChange, events[0].Family);
|
||||||
|
Assert.Equal(MxEventFamily.OnWriteComplete, events[1].Family);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().StreamDisconnects);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StreamEventsAsync_WhenSecondSubscriberStarts_RejectsClearly()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
GatewaySession session = CreateReadySession(workerClient);
|
||||||
|
EventStreamService service = CreateService(new FakeSessionManager(session));
|
||||||
|
using CancellationTokenSource firstSubscriberCancellation = new();
|
||||||
|
await using IAsyncEnumerator<MxEvent> firstSubscriber = service
|
||||||
|
.StreamEventsAsync(CreateRequest(session.SessionId), firstSubscriberCancellation.Token)
|
||||||
|
.GetAsyncEnumerator(firstSubscriberCancellation.Token);
|
||||||
|
Task<bool> firstMoveTask = firstSubscriber.MoveNextAsync().AsTask();
|
||||||
|
|
||||||
|
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1);
|
||||||
|
await using IAsyncEnumerator<MxEvent> secondSubscriber = service
|
||||||
|
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
|
||||||
|
.GetAsyncEnumerator();
|
||||||
|
|
||||||
|
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||||
|
async () => await secondSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||||
|
|
||||||
|
Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode);
|
||||||
|
await firstSubscriberCancellation.CancelAsync();
|
||||||
|
await Assert.ThrowsAnyAsync<OperationCanceledException>(
|
||||||
|
async () => await firstMoveTask.WaitAsync(TestTimeout));
|
||||||
|
await firstSubscriber.DisposeAsync();
|
||||||
|
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StreamEventsAsync_WhenCanceled_DetachesSubscriber()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
GatewaySession session = CreateReadySession(workerClient);
|
||||||
|
EventStreamService service = CreateService(new FakeSessionManager(session));
|
||||||
|
using CancellationTokenSource cancellationTokenSource = new();
|
||||||
|
await using IAsyncEnumerator<MxEvent> subscriber = service
|
||||||
|
.StreamEventsAsync(CreateRequest(session.SessionId), cancellationTokenSource.Token)
|
||||||
|
.GetAsyncEnumerator(cancellationTokenSource.Token);
|
||||||
|
Task<bool> moveTask = subscriber.MoveNextAsync().AsTask();
|
||||||
|
|
||||||
|
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1);
|
||||||
|
await cancellationTokenSource.CancelAsync();
|
||||||
|
await Assert.ThrowsAnyAsync<OperationCanceledException>(
|
||||||
|
async () => await moveTask.WaitAsync(TestTimeout));
|
||||||
|
await subscriber.DisposeAsync();
|
||||||
|
|
||||||
|
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
GatewaySession session = CreateReadySession(workerClient);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
EventStreamService service = CreateService(
|
||||||
|
new FakeSessionManager(session),
|
||||||
|
metrics,
|
||||||
|
queueCapacity: 1);
|
||||||
|
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
|
||||||
|
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
|
||||||
|
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
|
||||||
|
workerClient.CompleteAfterConfiguredEvents = true;
|
||||||
|
await using IAsyncEnumerator<MxEvent> subscriber = service
|
||||||
|
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
|
||||||
|
.GetAsyncEnumerator();
|
||||||
|
|
||||||
|
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||||
|
await WaitUntilAsync(() => session.State == SessionState.Faulted);
|
||||||
|
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||||
|
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||||
|
|
||||||
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
|
||||||
|
Assert.Equal(SessionState.Faulted, session.State);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().Faults);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StreamEventsAsync_DoesNotSynthesizeOperationComplete()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
GatewaySession session = CreateReadySession(workerClient);
|
||||||
|
EventStreamService service = CreateService(new FakeSessionManager(session));
|
||||||
|
workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnWriteComplete));
|
||||||
|
workerClient.CompleteAfterConfiguredEvents = true;
|
||||||
|
|
||||||
|
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
|
||||||
|
|
||||||
|
MxEvent mxEvent = Assert.Single(events);
|
||||||
|
Assert.Equal(MxEventFamily.OnWriteComplete, mxEvent.Family);
|
||||||
|
Assert.DoesNotContain(events, candidate => candidate.Family == MxEventFamily.OperationComplete);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StreamEventsAsync_WhenWorkerEventStreamFaults_PropagatesTerminalFault()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new()
|
||||||
|
{
|
||||||
|
TerminalException = new WorkerClientException(
|
||||||
|
WorkerClientErrorCode.WorkerFaulted,
|
||||||
|
"worker terminal fault"),
|
||||||
|
};
|
||||||
|
GatewaySession session = CreateReadySession(workerClient);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
EventStreamService service = CreateService(new FakeSessionManager(session), metrics);
|
||||||
|
await using IAsyncEnumerator<MxEvent> subscriber = service
|
||||||
|
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
|
||||||
|
.GetAsyncEnumerator();
|
||||||
|
|
||||||
|
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
|
||||||
|
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||||
|
|
||||||
|
Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode);
|
||||||
|
Assert.Equal(SessionState.Faulted, session.State);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().Faults);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EventStreamService CreateService(
|
||||||
|
FakeSessionManager sessionManager,
|
||||||
|
GatewayMetrics? metrics = null,
|
||||||
|
int queueCapacity = 8)
|
||||||
|
{
|
||||||
|
return new EventStreamService(
|
||||||
|
sessionManager,
|
||||||
|
Options.Create(new GatewayOptions
|
||||||
|
{
|
||||||
|
Events = new EventOptions
|
||||||
|
{
|
||||||
|
QueueCapacity = queueCapacity,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
new MxAccessGrpcMapper(),
|
||||||
|
metrics ?? new GatewayMetrics(),
|
||||||
|
NullLogger<EventStreamService>.Instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task<List<MxEvent>> CollectEventsAsync(
|
||||||
|
EventStreamService service,
|
||||||
|
string sessionId)
|
||||||
|
{
|
||||||
|
List<MxEvent> events = [];
|
||||||
|
await foreach (MxEvent mxEvent in service
|
||||||
|
.StreamEventsAsync(CreateRequest(sessionId), CancellationToken.None)
|
||||||
|
.WithCancellation(CancellationToken.None))
|
||||||
|
{
|
||||||
|
events.Add(mxEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamEventsRequest CreateRequest(string sessionId)
|
||||||
|
{
|
||||||
|
return new StreamEventsRequest
|
||||||
|
{
|
||||||
|
SessionId = sessionId,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GatewaySession CreateReadySession(FakeWorkerClient workerClient)
|
||||||
|
{
|
||||||
|
GatewaySession session = new(
|
||||||
|
"session-events",
|
||||||
|
GatewayContractInfo.DefaultBackendName,
|
||||||
|
"pipe",
|
||||||
|
"nonce",
|
||||||
|
"client",
|
||||||
|
"client-session",
|
||||||
|
"client-correlation",
|
||||||
|
TimeSpan.FromSeconds(30),
|
||||||
|
TimeSpan.FromSeconds(30),
|
||||||
|
TimeSpan.FromSeconds(10),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
session.AttachWorkerClient(workerClient);
|
||||||
|
session.MarkReady();
|
||||||
|
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEvent CreateWorkerEvent(
|
||||||
|
ulong sequence,
|
||||||
|
MxEventFamily family)
|
||||||
|
{
|
||||||
|
MxEvent mxEvent = new()
|
||||||
|
{
|
||||||
|
SessionId = "session-events",
|
||||||
|
Family = family,
|
||||||
|
WorkerSequence = sequence,
|
||||||
|
};
|
||||||
|
|
||||||
|
switch (family)
|
||||||
|
{
|
||||||
|
case MxEventFamily.OnDataChange:
|
||||||
|
mxEvent.OnDataChange = new OnDataChangeEvent();
|
||||||
|
break;
|
||||||
|
case MxEventFamily.OnWriteComplete:
|
||||||
|
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
|
||||||
|
break;
|
||||||
|
case MxEventFamily.OperationComplete:
|
||||||
|
mxEvent.OperationComplete = new OperationCompleteEvent();
|
||||||
|
break;
|
||||||
|
case MxEventFamily.OnBufferedDataChange:
|
||||||
|
mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new WorkerEvent
|
||||||
|
{
|
||||||
|
Event = mxEvent,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitUntilAsync(Func<bool> predicate)
|
||||||
|
{
|
||||||
|
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
|
||||||
|
while (!predicate())
|
||||||
|
{
|
||||||
|
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeSessionManager(GatewaySession session) : ISessionManager
|
||||||
|
{
|
||||||
|
public Task<GatewaySession> OpenSessionAsync(
|
||||||
|
SessionOpenRequest request,
|
||||||
|
string? clientIdentity,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryGetSession(
|
||||||
|
string sessionId,
|
||||||
|
out GatewaySession gatewaySession)
|
||||||
|
{
|
||||||
|
gatewaySession = session;
|
||||||
|
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
string sessionId,
|
||||||
|
WorkerCommand command,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(new WorkerCommandReply());
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
string sessionId,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return session.ReadEventsAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<SessionCloseResult> CloseSessionAsync(
|
||||||
|
string sessionId,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<int> CloseExpiredLeasesAsync(
|
||||||
|
DateTimeOffset now,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ShutdownAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeWorkerClient : IWorkerClient
|
||||||
|
{
|
||||||
|
public List<WorkerEvent> Events { get; } = [];
|
||||||
|
|
||||||
|
public bool CompleteAfterConfiguredEvents { get; set; }
|
||||||
|
|
||||||
|
public Exception? TerminalException { get; init; }
|
||||||
|
|
||||||
|
public string SessionId { get; } = "session-events";
|
||||||
|
|
||||||
|
public int? ProcessId { get; } = 4321;
|
||||||
|
|
||||||
|
public WorkerClientState State { get; private set; } = WorkerClientState.Ready;
|
||||||
|
|
||||||
|
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
WorkerCommand command,
|
||||||
|
TimeSpan timeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(new WorkerCommandReply());
|
||||||
|
}
|
||||||
|
|
||||||
|
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
foreach (WorkerEvent workerEvent in Events)
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
yield return workerEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TerminalException is not null)
|
||||||
|
{
|
||||||
|
throw TerminalException;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CompleteAfterConfiguredEvents)
|
||||||
|
{
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ShutdownAsync(
|
||||||
|
TimeSpan timeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
State = WorkerClientState.Closed;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Kill(string reason)
|
||||||
|
{
|
||||||
|
State = WorkerClientState.Faulted;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -184,6 +184,7 @@ public sealed class MxAccessGatewayServiceTests
|
|||||||
identityAccessor ?? new GatewayRequestIdentityAccessor(),
|
identityAccessor ?? new GatewayRequestIdentityAccessor(),
|
||||||
new MxAccessGrpcRequestValidator(),
|
new MxAccessGrpcRequestValidator(),
|
||||||
new MxAccessGrpcMapper(),
|
new MxAccessGrpcMapper(),
|
||||||
|
new FakeEventStreamService(sessionManager),
|
||||||
NullLogger<MxAccessGatewayService>.Instance);
|
NullLogger<MxAccessGatewayService>.Instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,6 +276,11 @@ public sealed class MxAccessGatewayServiceTests
|
|||||||
|
|
||||||
public List<WorkerEvent> Events { get; } = [];
|
public List<WorkerEvent> Events { get; } = [];
|
||||||
|
|
||||||
|
public void RecordReadEventsSessionId(string sessionId)
|
||||||
|
{
|
||||||
|
LastReadEventsSessionId = sessionId;
|
||||||
|
}
|
||||||
|
|
||||||
public Task<GatewaySession> OpenSessionAsync(
|
public Task<GatewaySession> OpenSessionAsync(
|
||||||
SessionOpenRequest request,
|
SessionOpenRequest request,
|
||||||
string? clientIdentity,
|
string? clientIdentity,
|
||||||
@@ -343,6 +349,27 @@ public sealed class MxAccessGatewayServiceTests
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private sealed class FakeEventStreamService(FakeSessionManager sessionManager) : IEventStreamService
|
||||||
|
{
|
||||||
|
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||||
|
StreamEventsRequest request,
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
sessionManager.RecordReadEventsSessionId(request.SessionId);
|
||||||
|
foreach (WorkerEvent workerEvent in sessionManager.Events)
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
await Task.Yield();
|
||||||
|
if (workerEvent.Event.WorkerSequence <= request.AfterWorkerSequence)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
yield return workerEvent.Event;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private sealed class FakeWorkerClient(int processId) : IWorkerClient
|
private sealed class FakeWorkerClient(int processId) : IWorkerClient
|
||||||
{
|
{
|
||||||
public string SessionId { get; } = "session-1";
|
public string SessionId { get; } = "session-1";
|
||||||
|
|||||||
@@ -109,6 +109,32 @@ public sealed class WorkerClientTests
|
|||||||
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
|
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient()
|
||||||
|
{
|
||||||
|
await using PipePair pipePair = await PipePair.CreateAsync();
|
||||||
|
await using WorkerClient client = CreateClient(
|
||||||
|
pipePair,
|
||||||
|
new WorkerClientOptions
|
||||||
|
{
|
||||||
|
EventChannelCapacity = 1,
|
||||||
|
HeartbeatGrace = TimeSpan.FromSeconds(30),
|
||||||
|
HeartbeatCheckInterval = TimeSpan.FromSeconds(30),
|
||||||
|
});
|
||||||
|
await CompleteHandshakeAsync(client, pipePair);
|
||||||
|
|
||||||
|
await pipePair.WorkerWriter.WriteAsync(
|
||||||
|
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
|
||||||
|
await pipePair.WorkerWriter.WriteAsync(
|
||||||
|
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
|
||||||
|
|
||||||
|
await WaitUntilAsync(
|
||||||
|
() => client.State == WorkerClientState.Faulted,
|
||||||
|
TestTimeout);
|
||||||
|
|
||||||
|
Assert.Equal(WorkerClientState.Faulted, client.State);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
|
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user