Compare commits

...

5 Commits

Author SHA1 Message Date
Joseph Doherty 77eac95f33 Issue #14: implement event streaming and backpressure 2026-04-26 17:59:37 -04:00
dohertj2 015fa1f50d Merge pull request #73 from agent-1/issue-15-implement-dashboard-snapshot-service
Issue #15: implement dashboard snapshot service
2026-04-26 17:56:01 -04:00
dohertj2 dede407304 Merge pull request #72 from agent-2/issue-25-implement-sta-command-dispatcher
Issue #25: implement STA command dispatcher
2026-04-26 17:53:32 -04:00
Joseph Doherty 0d96963c99 Merge remote-tracking branch 'origin/main' into agent-1/issue-15-implement-dashboard-snapshot-service 2026-04-26 17:52:58 -04:00
Joseph Doherty 3661420f0a Issue #15: implement dashboard snapshot service 2026-04-26 17:49:59 -04:00
22 changed files with 1303 additions and 35 deletions
+23 -10
View File
@@ -206,13 +206,23 @@ accounting and a clear fan-out policy.
Behavior:
1. Validate session id and authorize event access.
2. Attach a stream cursor to the session event channel.
3. Send events in worker sequence order.
4. Stop on client cancellation, session close, or session fault.
5. Emit a terminal status when the session faults if gRPC status alone cannot
2. Attach the single active subscriber lease for the session.
3. Read worker events into a bounded public stream queue.
4. Send events in worker sequence order.
5. Stop on client cancellation, session close, or session fault.
6. Emit a terminal status when the session faults if gRPC status alone cannot
preserve the required details.
The gateway must not reorder events from one worker.
`EventStreamService` owns subscriber tracking and public stream backpressure.
The default policy allows one active subscriber per session. A second subscriber
is rejected with `EventSubscriberAlreadyActive`. Stream cancellation releases
the subscriber lease so a later stream can attach to the session.
The gateway must not reorder events from one worker. `EventStreamService` writes
mapped events to a bounded first-in, first-out queue and faults the session with
`EventQueueOverflow` if the queue fills. The gateway does not synthesize
`OperationComplete`; it forwards that family only when the worker reports a
native MXAccess `OperationComplete` event.
## Web Dashboard
@@ -584,7 +594,8 @@ worker MXAccess event
-> worker outbound event queue
-> worker pipe writer
-> gateway read loop
-> session event channel
-> worker client event queue
-> EventStreamService bounded stream queue
-> gRPC StreamEvents
```
@@ -598,13 +609,15 @@ The gateway should record:
Default backpressure policy for parity testing should be fail-fast:
1. If the session event channel fills, fault the session.
1. If the worker client event queue fills, fault the worker client.
2. If the public stream queue fills, fault the gateway session.
2. Preserve the overflow details in logs and metrics.
3. Do not silently drop data-change events.
Do not set a production event-rate target before measurement. Emit event rate,
queue depth, stream send latency, and overflow metrics. Later production modes
may support explicit coalescing by item handle as an opt-in behavior.
Do not set a production event-rate target before measurement. `GatewayMetrics`
records received event counts by family, queue depth, stream disconnects, and
overflow counts. Later production modes may support explicit coalescing by item
handle as an opt-in behavior.
The gateway should not synthesize `OperationComplete` from write completion,
command replies, ASB completion queues, or completion-only status frames. Forward
+17 -12
View File
@@ -107,6 +107,8 @@ worker, correlation, command, and client identity fields with redaction applied
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
histograms through .NET `Meter` and a snapshot API that dashboard services can
project without binding to a metrics exporter.
`DashboardSnapshotService` projects sessions, workers, metrics, faults, and
effective configuration into immutable DTOs for read-only dashboard rendering.
### Worker Process
@@ -518,11 +520,7 @@ Worker policy:
- bounded outbound event channel,
- never block MXAccess event handler on pipe writes,
- if the outbound channel is full, apply configured policy:
- disconnect session,
- drop oldest low-priority data-change events,
- coalesce data changes by item handle,
- or block briefly then fault.
- fail the worker session when the outbound channel is full.
For full parity testing, default should be fail-fast rather than silent drop.
For production high-rate telemetry, add explicit coalescing modes.
@@ -531,9 +529,15 @@ Gateway policy:
- one event sequencer per session,
- preserve per-session event order,
- support multiple client event subscribers only if explicitly required,
- apply backpressure from slow gRPC streams,
- disconnect or coalesce according to client-selected mode.
- allow one active client event subscriber per session,
- reject a second subscriber with a clear session error,
- use a bounded `EventStreamService` queue between worker events and gRPC
writes,
- fault the session when the bounded stream queue overflows,
- detach the subscriber when the stream is canceled.
The gateway forwards only events reported by the worker. It does not synthesize
`OperationComplete` from write completion, command replies, or status frames.
## Isolation And Fault Handling
@@ -855,10 +859,11 @@ translation code testable.
The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service
implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by
validating public requests, delegating session work to `ISessionManager`, and
using explicit mapper code for public-to-worker commands, worker replies, and
events. Missing sessions and transport failures return gRPC status errors;
worker command replies preserve MXAccess HRESULT and status details in the
public reply.
using explicit mapper code for public-to-worker commands and worker replies.
`StreamEvents` delegates subscriber ownership, ordering, and backpressure to
`EventStreamService`. Missing sessions and transport failures return gRPC
status errors; worker command replies preserve MXAccess HRESULT and status
details in the public reply.
## C# Worker Versus C++ Worker
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Dashboard;
public sealed record DashboardFaultSummary(
string Source,
string? SessionId,
int? WorkerProcessId,
string State,
string Message,
DateTimeOffset ObservedAt);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Dashboard;
public sealed record DashboardMetricSummary(
string Name,
long Value,
string? Dimension = null);
@@ -0,0 +1,34 @@
using MxGateway.Server.Diagnostics;
namespace MxGateway.Server.Dashboard;
internal static class DashboardRedactor
{
private static readonly string[] SensitiveTextMarkers =
[
"apikey",
"api_key",
"authorization",
"credential",
"password",
"secret",
"token",
];
public static string? Redact(string? value)
{
if (string.IsNullOrWhiteSpace(value))
{
return value;
}
if (value.Contains("mxgw_", StringComparison.OrdinalIgnoreCase))
{
return GatewayLogRedactor.RedactClientIdentity(value);
}
return SensitiveTextMarkers.Any(marker => value.Contains(marker, StringComparison.OrdinalIgnoreCase))
? GatewayLogRedactor.RedactedValue
: value;
}
}
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Dashboard;
public static class DashboardServiceCollectionExtensions
{
public static IServiceCollection AddGatewayDashboard(this IServiceCollection services)
{
services.AddSingleton<IDashboardSnapshotService, DashboardSnapshotService>();
return services;
}
}
@@ -0,0 +1,19 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardSessionSummary(
string SessionId,
string BackendName,
SessionState State,
string? ClientIdentity,
string? ClientSessionName,
string? ClientCorrelationId,
DateTimeOffset OpenedAt,
DateTimeOffset LastClientActivityAt,
DateTimeOffset? LeaseExpiresAt,
int? WorkerProcessId,
WorkerClientState? WorkerState,
DateTimeOffset? LastWorkerHeartbeatAt,
string? LastFault);
@@ -0,0 +1,15 @@
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardSnapshot(
DateTimeOffset GeneratedAt,
DateTimeOffset GatewayStartedAt,
TimeSpan GatewayUptime,
string GatewayStatus,
string GatewayVersion,
IReadOnlyList<DashboardSessionSummary> Sessions,
IReadOnlyList<DashboardWorkerSummary> Workers,
IReadOnlyList<DashboardMetricSummary> Metrics,
IReadOnlyList<DashboardFaultSummary> Faults,
EffectiveGatewayConfiguration Configuration);
@@ -0,0 +1,196 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardSnapshotService : IDashboardSnapshotService
{
private const string HealthyStatus = "Healthy";
private readonly ISessionRegistry _sessionRegistry;
private readonly GatewayMetrics _metrics;
private readonly IGatewayConfigurationProvider _configurationProvider;
private readonly TimeProvider _timeProvider;
private readonly DateTimeOffset _gatewayStartedAt;
private readonly TimeSpan _snapshotInterval;
private readonly int _recentFaultLimit;
private readonly int _recentSessionLimit;
public DashboardSnapshotService(
ISessionRegistry sessionRegistry,
GatewayMetrics metrics,
IGatewayConfigurationProvider configurationProvider,
IOptions<GatewayOptions> options,
TimeProvider? timeProvider = null)
{
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
ArgumentNullException.ThrowIfNull(options);
_timeProvider = timeProvider ?? TimeProvider.System;
_gatewayStartedAt = _timeProvider.GetUtcNow();
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
}
public DashboardSnapshot GetSnapshot()
{
DateTimeOffset generatedAt = _timeProvider.GetUtcNow();
IReadOnlyList<GatewaySession> sessions = _sessionRegistry.Snapshot()
.OrderByDescending(session => session.OpenedAt)
.ToArray();
IReadOnlyList<DashboardSessionSummary> sessionSummaries = sessions
.Take(ResolveLimit(_recentSessionLimit))
.Select(CreateSessionSummary)
.ToArray();
IReadOnlyList<DashboardWorkerSummary> workerSummaries = sessions
.Where(session => session.WorkerClient is not null)
.Select(CreateWorkerSummary)
.ToArray();
GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot();
return new DashboardSnapshot(
GeneratedAt: generatedAt,
GatewayStartedAt: _gatewayStartedAt,
GatewayUptime: generatedAt - _gatewayStartedAt,
GatewayStatus: HealthyStatus,
GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown",
Sessions: sessionSummaries,
Workers: workerSummaries,
Metrics: CreateMetricSummaries(metricsSnapshot),
Faults: CreateFaultSummaries(sessions, generatedAt),
Configuration: _configurationProvider.GetEffectiveConfiguration());
}
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
yield break;
}
yield return GetSnapshot();
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
while (true)
{
bool hasNext;
try
{
hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
yield break;
}
if (!hasNext)
{
yield break;
}
yield return GetSnapshot();
}
}
private static DashboardSessionSummary CreateSessionSummary(GatewaySession session)
{
IWorkerClient? workerClient = session.WorkerClient;
return new DashboardSessionSummary(
SessionId: session.SessionId,
BackendName: session.BackendName,
State: session.State,
ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity),
ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName),
ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId),
OpenedAt: session.OpenedAt,
LastClientActivityAt: session.LastClientActivityAt,
LeaseExpiresAt: session.LeaseExpiresAt,
WorkerProcessId: workerClient?.ProcessId,
WorkerState: workerClient?.State,
LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt,
LastFault: DashboardRedactor.Redact(session.FinalFault));
}
private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session)
{
IWorkerClient workerClient = session.WorkerClient!;
return new DashboardWorkerSummary(
SessionId: session.SessionId,
ProcessId: workerClient.ProcessId,
State: workerClient.State,
LastHeartbeatAt: workerClient.LastHeartbeatAt,
LastFault: DashboardRedactor.Redact(session.FinalFault));
}
private static IReadOnlyList<DashboardMetricSummary> CreateMetricSummaries(GatewayMetricsSnapshot snapshot)
{
List<DashboardMetricSummary> metrics =
[
new("mxgateway.sessions.open", snapshot.OpenSessions),
new("mxgateway.workers.running", snapshot.WorkersRunning),
new("mxgateway.events.queue.depth", snapshot.EventQueueDepth),
new("mxgateway.sessions.opened", snapshot.SessionsOpened),
new("mxgateway.sessions.closed", snapshot.SessionsClosed),
new("mxgateway.commands.started", snapshot.CommandsStarted),
new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded),
new("mxgateway.commands.failed", snapshot.CommandsFailed),
new("mxgateway.events.received", snapshot.EventsReceived),
new("mxgateway.queues.overflows", snapshot.QueueOverflows),
new("mxgateway.faults", snapshot.Faults),
new("mxgateway.workers.killed", snapshot.WorkerKills),
new("mxgateway.workers.exited", snapshot.WorkerExits),
new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures),
new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects),
];
metrics.AddRange(snapshot.CommandFailuresByMethod
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key)));
metrics.AddRange(snapshot.EventsByFamily
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key)));
return metrics;
}
private IReadOnlyList<DashboardFaultSummary> CreateFaultSummaries(
IReadOnlyList<GatewaySession> sessions,
DateTimeOffset generatedAt)
{
return sessions
.Where(HasFault)
.Take(ResolveLimit(_recentFaultLimit))
.Select(session => new DashboardFaultSummary(
Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session",
SessionId: session.SessionId,
WorkerProcessId: session.WorkerProcessId,
State: session.WorkerClient?.State == WorkerClientState.Faulted
? WorkerClientState.Faulted.ToString()
: session.State.ToString(),
Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted",
ObservedAt: generatedAt))
.ToArray();
}
private static bool HasFault(GatewaySession session)
{
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
|| session.WorkerClient?.State == WorkerClientState.Faulted
|| !string.IsNullOrWhiteSpace(session.FinalFault);
}
private static int ResolveLimit(int configuredLimit)
{
return configuredLimit < 0 ? 0 : configuredLimit;
}
}
@@ -0,0 +1,10 @@
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardWorkerSummary(
string SessionId,
int? ProcessId,
WorkerClientState State,
DateTimeOffset LastHeartbeatAt,
string? LastFault);
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Dashboard;
public interface IDashboardSnapshotService
{
DashboardSnapshot GetSnapshot();
IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(CancellationToken cancellationToken);
}
@@ -1,5 +1,6 @@
using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Diagnostics;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
@@ -34,8 +35,10 @@ public static class GatewayApplication
builder.Services.AddSingleton<GatewayMetrics>();
builder.Services.AddSingleton<MxAccessGrpcMapper>();
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
builder.Services.AddSingleton<IEventStreamService, EventStreamService>();
builder.Services.AddWorkerProcessLauncher();
builder.Services.AddGatewaySessions();
builder.Services.AddGatewayDashboard();
return builder;
}
@@ -0,0 +1,140 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Grpc;
public sealed class EventStreamService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
MxAccessGrpcMapper mapper,
GatewayMetrics metrics,
ILogger<EventStreamService> logger) : IEventStreamService
{
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {request.SessionId} was not found.");
}
using IDisposable subscriber = session.AttachEventSubscriber(
options.Value.Sessions.AllowMultipleEventSubscribers);
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
int streamQueueDepth = 0;
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
Task producerTask = ProduceEventsAsync(
session,
request.AfterWorkerSequence,
eventQueue.Writer,
() =>
{
int depth = Interlocked.Increment(ref streamQueueDepth);
metrics.SetEventQueueDepth(depth);
},
streamCts.Token);
try
{
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth));
metrics.SetEventQueueDepth(depth);
yield return mxEvent;
}
await producerTask.ConfigureAwait(false);
}
finally
{
await streamCts.CancelAsync().ConfigureAwait(false);
subscriber.Dispose();
metrics.StreamDisconnected("Detached");
try
{
await producerTask.ConfigureAwait(false);
}
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
{
}
catch (Exception exception)
{
logger.LogDebug(
exception,
"Event stream producer stopped for session {SessionId}.",
request.SessionId);
}
}
}
private async Task ProduceEventsAsync(
GatewaySession session,
ulong afterWorkerSequence,
ChannelWriter<MxEvent> writer,
Action eventQueued,
CancellationToken cancellationToken)
{
try
{
await foreach (WorkerEvent workerEvent in session
.ReadEventsAsync(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
MxEvent publicEvent = mapper.MapEvent(workerEvent);
if (publicEvent.WorkerSequence <= afterWorkerSequence)
{
continue;
}
if (!writer.TryWrite(publicEvent))
{
string message = $"Session {session.SessionId} event stream queue overflowed.";
session.MarkFaulted(message);
metrics.QueueOverflow("grpc-event-stream");
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
message));
return;
}
eventQueued();
}
writer.TryComplete();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
writer.TryComplete();
}
catch (Exception exception)
{
if (exception is WorkerClientException)
{
session.MarkFaulted(exception.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
}
writer.TryComplete(exception);
}
}
}
@@ -0,0 +1,10 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public interface IEventStreamService
{
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken);
}
@@ -12,6 +12,7 @@ public sealed class MxAccessGatewayService(
IGatewayRequestIdentityAccessor identityAccessor,
MxAccessGrpcRequestValidator requestValidator,
MxAccessGrpcMapper mapper,
IEventStreamService eventStreamService,
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
{
public override async Task<OpenSessionReply> OpenSession(
@@ -102,17 +103,11 @@ public sealed class MxAccessGatewayService(
try
{
requestValidator.ValidateStreamEvents(request);
await foreach (WorkerEvent workerEvent in sessionManager
.ReadEventsAsync(request.SessionId, context.CancellationToken)
await foreach (MxEvent publicEvent in eventStreamService
.StreamEventsAsync(request, context.CancellationToken)
.WithCancellation(context.CancellationToken)
.ConfigureAwait(false))
{
MxEvent publicEvent = mapper.MapEvent(workerEvent);
if (publicEvent.WorkerSequence <= request.AfterWorkerSequence)
{
continue;
}
await responseStream.WriteAsync(publicEvent).ConfigureAwait(false);
}
}
@@ -154,6 +149,8 @@ public sealed class MxAccessGatewayService(
{
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
@@ -13,6 +13,7 @@ public sealed class GatewaySession
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
public GatewaySession(
string sessionId,
@@ -131,6 +132,17 @@ public sealed class GatewaySession
}
}
public int ActiveEventSubscriberCount
{
get
{
lock (_syncRoot)
{
return _activeEventSubscriberCount;
}
}
}
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
@@ -202,6 +214,29 @@ public sealed class GatewaySession
}
}
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
{
throw new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
return new EventSubscriberLease(this);
}
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
@@ -287,4 +322,31 @@ public sealed class GatewaySession
return _workerClient;
}
}
private void DetachEventSubscriber()
{
lock (_syncRoot)
{
if (_activeEventSubscriberCount > 0)
{
_activeEventSubscriberCount--;
}
}
}
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
{
private bool _disposed;
public void Dispose()
{
if (_disposed)
{
return;
}
session.DetachEventSubscriber();
_disposed = true;
}
}
}
@@ -4,6 +4,8 @@ public enum SessionManagerErrorCode
{
SessionNotFound,
SessionNotReady,
EventSubscriberAlreadyActive,
EventQueueOverflow,
SessionLimitExceeded,
OpenFailed,
CloseFailed,
+7 -5
View File
@@ -29,6 +29,7 @@ public sealed class WorkerClient : IWorkerClient
private WorkerClientState _state;
private DateTimeOffset _lastHeartbeatAt;
private int? _processId;
private int _eventQueueDepth;
private Task? _readLoopTask;
private Task? _writeLoopTask;
private Task? _heartbeatLoopTask;
@@ -197,6 +198,8 @@ public sealed class WorkerClient : IWorkerClient
{
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth));
_metrics?.SetEventQueueDepth(queueDepth);
yield return workerEvent;
}
}
@@ -394,11 +397,6 @@ public sealed class WorkerClient : IWorkerClient
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
}
if (!await _events.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
{
return;
}
if (!_events.Writer.TryWrite(workerEvent))
{
_metrics?.QueueOverflow("worker-events");
@@ -406,7 +404,11 @@ public sealed class WorkerClient : IWorkerClient
WorkerClientErrorCode.ProtocolViolation,
"Worker event channel rejected an event.",
null);
return;
}
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
_metrics?.SetEventQueueDepth(queueDepth);
}
private void CompleteCommand(WorkerEnvelope envelope)
@@ -0,0 +1,290 @@
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardSnapshotServiceTests
{
[Fact]
public void GetSnapshot_WhenRegistryEmpty_ReturnsEmptyOperationalState()
{
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(new SessionRegistry(), metrics);
DashboardSnapshot snapshot = service.GetSnapshot();
Assert.Empty(snapshot.Sessions);
Assert.Empty(snapshot.Workers);
Assert.Empty(snapshot.Faults);
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.sessions.open" && metric.Value == 0);
Assert.Equal("Healthy", snapshot.GatewayStatus);
Assert.NotNull(snapshot.Configuration);
}
[Fact]
public void GetSnapshot_ProjectsActiveAndFaultedSessionsWorkersMetricsAndFaults()
{
SessionRegistry registry = new();
GatewaySession activeSession = CreateSession(
"session-active",
"client-one",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
activeSession.AttachWorkerClient(new FakeWorkerClient("session-active", 1201, WorkerClientState.Ready));
activeSession.MarkReady();
GatewaySession faultedSession = CreateSession(
"session-faulted",
"client-two",
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
faultedSession.AttachWorkerClient(new FakeWorkerClient("session-faulted", 1202, WorkerClientState.Faulted));
faultedSession.MarkFaulted("worker pipe disconnected");
registry.TryAdd(activeSession);
registry.TryAdd(faultedSession);
using GatewayMetrics metrics = new();
metrics.SessionOpened();
metrics.SessionOpened();
metrics.CommandStarted("Register");
metrics.CommandFailed("Register", "WorkerFaulted", TimeSpan.FromMilliseconds(7));
metrics.EventReceived("session-active", "OnDataChange");
metrics.Fault("WorkerFaulted");
DashboardSnapshotService service = CreateService(registry, metrics);
DashboardSnapshot snapshot = service.GetSnapshot();
Assert.Equal(2, snapshot.Sessions.Count);
Assert.Equal("session-faulted", snapshot.Sessions[0].SessionId);
Assert.Equal(SessionState.Faulted, snapshot.Sessions[0].State);
Assert.Equal(2, snapshot.Workers.Count);
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.commands.started" && metric.Value == 1);
Assert.Contains(
snapshot.Metrics,
metric => metric.Name == "mxgateway.events.received"
&& metric.Dimension == "OnDataChange"
&& metric.Value == 1);
DashboardFaultSummary fault = Assert.Single(snapshot.Faults);
Assert.Equal("Worker", fault.Source);
Assert.Equal("session-faulted", fault.SessionId);
Assert.Equal("worker pipe disconnected", fault.Message);
}
[Fact]
public void GetSnapshot_RedactsSecretsFromSessionAndFaultFields()
{
SessionRegistry registry = new();
GatewaySession session = CreateSession(
"session-redacted",
"Bearer mxgw_admin_super-secret",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"),
clientSessionName: "password=hunter2",
clientCorrelationId: "token=abc123");
session.MarkFaulted("secret=credential-value");
registry.TryAdd(session);
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(registry, metrics);
DashboardSnapshot snapshot = service.GetSnapshot();
DashboardSessionSummary summary = Assert.Single(snapshot.Sessions);
Assert.Equal("Bearer mxgw_admin_[redacted]", summary.ClientIdentity);
Assert.Equal("[redacted]", summary.ClientSessionName);
Assert.Equal("[redacted]", summary.ClientCorrelationId);
Assert.Equal("[redacted]", summary.LastFault);
Assert.Equal("[redacted]", Assert.Single(snapshot.Faults).Message);
Assert.Equal("[redacted]", snapshot.Configuration.Authentication.PepperSecretName);
}
[Fact]
public void GetSnapshot_DoesNotMutateSessionOrWorkerState()
{
SessionRegistry registry = new();
GatewaySession session = CreateSession(
"session-active",
"client-one",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
FakeWorkerClient workerClient = new("session-active", 1201, WorkerClientState.Ready);
session.AttachWorkerClient(workerClient);
session.MarkReady();
registry.TryAdd(session);
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(registry, metrics);
service.GetSnapshot();
service.GetSnapshot();
Assert.Equal(1, registry.ActiveCount);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(WorkerClientState.Ready, workerClient.State);
Assert.Equal(0, workerClient.StartCount);
Assert.Equal(0, workerClient.ShutdownCount);
Assert.Equal(0, workerClient.KillCount);
}
[Fact]
public void GetSnapshot_AppliesRecentSessionAndFaultLimits()
{
SessionRegistry registry = new();
GatewaySession olderSession = CreateSession(
"session-older",
"client-one",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
GatewaySession newerSession = CreateSession(
"session-newer",
"client-two",
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
olderSession.MarkFaulted("older fault");
newerSession.MarkFaulted("newer fault");
registry.TryAdd(olderSession);
registry.TryAdd(newerSession);
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(
registry,
metrics,
new GatewayOptions
{
Dashboard = new DashboardOptions
{
SnapshotIntervalMilliseconds = 1,
RecentSessionLimit = 1,
RecentFaultLimit = 1,
},
});
DashboardSnapshot snapshot = service.GetSnapshot();
Assert.Equal("session-newer", Assert.Single(snapshot.Sessions).SessionId);
Assert.Equal("session-newer", Assert.Single(snapshot.Faults).SessionId);
}
[Fact]
public async Task WatchSnapshotsAsync_WhenSubscriberCancels_DisposesCleanly()
{
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(
new SessionRegistry(),
metrics,
new GatewayOptions
{
Dashboard = new DashboardOptions
{
SnapshotIntervalMilliseconds = 1,
},
});
using CancellationTokenSource cancellation = new();
await using IAsyncEnumerator<DashboardSnapshot> enumerator = service
.WatchSnapshotsAsync(cancellation.Token)
.GetAsyncEnumerator();
Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1)));
await cancellation.CancelAsync();
bool hasNext = await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1));
Assert.False(hasNext);
}
private static DashboardSnapshotService CreateService(
SessionRegistry registry,
GatewayMetrics metrics,
GatewayOptions? options = null)
{
GatewayOptions resolvedOptions = options ?? new GatewayOptions
{
Dashboard = new DashboardOptions
{
SnapshotIntervalMilliseconds = 1,
},
};
GatewayConfigurationProvider configurationProvider = new(Options.Create(resolvedOptions));
return new DashboardSnapshotService(
registry,
metrics,
configurationProvider,
Options.Create(resolvedOptions));
}
private static GatewaySession CreateSession(
string sessionId,
string? clientIdentity,
DateTimeOffset openedAt,
string? clientSessionName = "test-session",
string? clientCorrelationId = "client-correlation")
{
return new GatewaySession(
sessionId,
"mxaccess",
$"mxaccess-gateway-1-{sessionId}",
"nonce",
clientIdentity,
clientSessionName,
clientCorrelationId,
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(5),
openedAt);
}
private sealed class FakeWorkerClient(
string sessionId,
int? processId,
WorkerClientState state) : IWorkerClient
{
public string SessionId { get; } = sessionId;
public int? ProcessId { get; } = processId;
public WorkerClientState State { get; private set; } = state;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.Parse("2026-04-26T10:02:00Z");
public int StartCount { get; private set; }
public int ShutdownCount { get; private set; }
public int KillCount { get; private set; }
public Task StartAsync(CancellationToken cancellationToken)
{
StartCount++;
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
ShutdownCount++;
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
{
KillCount++;
State = WorkerClientState.Faulted;
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
}
@@ -0,0 +1,383 @@
using System.Runtime.CompilerServices;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Grpc;
public sealed class EventStreamServiceTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task StreamEventsAsync_YieldsEventsInWorkerOrder()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
FakeSessionManager sessionManager = new(session);
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(sessionManager, metrics: metrics);
workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 11, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([10UL, 11UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal(MxEventFamily.OnDataChange, events[0].Family);
Assert.Equal(MxEventFamily.OnWriteComplete, events[1].Family);
Assert.Equal(1, metrics.GetSnapshot().StreamDisconnects);
}
[Fact]
public async Task StreamEventsAsync_WhenSecondSubscriberStarts_RejectsClearly()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
using CancellationTokenSource firstSubscriberCancellation = new();
await using IAsyncEnumerator<MxEvent> firstSubscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), firstSubscriberCancellation.Token)
.GetAsyncEnumerator(firstSubscriberCancellation.Token);
Task<bool> firstMoveTask = firstSubscriber.MoveNextAsync().AsTask();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1);
await using IAsyncEnumerator<MxEvent> secondSubscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await secondSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode);
await firstSubscriberCancellation.CancelAsync();
await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await firstMoveTask.WaitAsync(TestTimeout));
await firstSubscriber.DisposeAsync();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0);
}
[Fact]
public async Task StreamEventsAsync_WhenCanceled_DetachesSubscriber()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
using CancellationTokenSource cancellationTokenSource = new();
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), cancellationTokenSource.Token)
.GetAsyncEnumerator(cancellationTokenSource.Token);
Task<bool> moveTask = subscriber.MoveNextAsync().AsTask();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1);
await cancellationTokenSource.CancelAsync();
await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await moveTask.WaitAsync(TestTimeout));
await subscriber.DisposeAsync();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0);
}
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
await WaitUntilAsync(() => session.State == SessionState.Faulted);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
Assert.Equal(SessionState.Faulted, session.State);
Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
[Fact]
public async Task StreamEventsAsync_DoesNotSynthesizeOperationComplete()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
MxEvent mxEvent = Assert.Single(events);
Assert.Equal(MxEventFamily.OnWriteComplete, mxEvent.Family);
Assert.DoesNotContain(events, candidate => candidate.Family == MxEventFamily.OperationComplete);
}
[Fact]
public async Task StreamEventsAsync_WhenWorkerEventStreamFaults_PropagatesTerminalFault()
{
FakeWorkerClient workerClient = new()
{
TerminalException = new WorkerClientException(
WorkerClientErrorCode.WorkerFaulted,
"worker terminal fault"),
};
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(new FakeSessionManager(session), metrics);
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode);
Assert.Equal(SessionState.Faulted, session.State);
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
private static EventStreamService CreateService(
FakeSessionManager sessionManager,
GatewayMetrics? metrics = null,
int queueCapacity = 8)
{
return new EventStreamService(
sessionManager,
Options.Create(new GatewayOptions
{
Events = new EventOptions
{
QueueCapacity = queueCapacity,
},
}),
new MxAccessGrpcMapper(),
metrics ?? new GatewayMetrics(),
NullLogger<EventStreamService>.Instance);
}
private static async Task<List<MxEvent>> CollectEventsAsync(
EventStreamService service,
string sessionId)
{
List<MxEvent> events = [];
await foreach (MxEvent mxEvent in service
.StreamEventsAsync(CreateRequest(sessionId), CancellationToken.None)
.WithCancellation(CancellationToken.None))
{
events.Add(mxEvent);
}
return events;
}
private static StreamEventsRequest CreateRequest(string sessionId)
{
return new StreamEventsRequest
{
SessionId = sessionId,
};
}
private static GatewaySession CreateReadySession(FakeWorkerClient workerClient)
{
GatewaySession session = new(
"session-events",
GatewayContractInfo.DefaultBackendName,
"pipe",
"nonce",
"client",
"client-session",
"client-correlation",
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(10),
DateTimeOffset.UtcNow);
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
}
private static WorkerEvent CreateWorkerEvent(
ulong sequence,
MxEventFamily family)
{
MxEvent mxEvent = new()
{
SessionId = "session-events",
Family = family,
WorkerSequence = sequence,
};
switch (family)
{
case MxEventFamily.OnDataChange:
mxEvent.OnDataChange = new OnDataChangeEvent();
break;
case MxEventFamily.OnWriteComplete:
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
break;
case MxEventFamily.OperationComplete:
mxEvent.OperationComplete = new OperationCompleteEvent();
break;
case MxEventFamily.OnBufferedDataChange:
mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent();
break;
}
return new WorkerEvent
{
Event = mxEvent,
};
}
private static async Task WaitUntilAsync(Func<bool> predicate)
{
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
private sealed class FakeSessionManager(GatewaySession session) : ISessionManager
{
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
return Task.FromResult(session);
}
public bool TryGetSession(
string sessionId,
out GatewaySession gatewaySession)
{
gatewaySession = session;
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
}
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
return session.ReadEventsAsync(cancellationToken);
}
public Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
}
public Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
private sealed class FakeWorkerClient : IWorkerClient
{
public List<WorkerEvent> Events { get; } = [];
public bool CompleteAfterConfiguredEvents { get; set; }
public Exception? TerminalException { get; init; }
public string SessionId { get; } = "session-events";
public int? ProcessId { get; } = 4321;
public WorkerClientState State { get; private set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (WorkerEvent workerEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
yield return workerEvent;
}
if (TerminalException is not null)
{
throw TerminalException;
}
if (CompleteAfterConfiguredEvents)
{
yield break;
}
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
{
State = WorkerClientState.Faulted;
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
}
@@ -184,6 +184,7 @@ public sealed class MxAccessGatewayServiceTests
identityAccessor ?? new GatewayRequestIdentityAccessor(),
new MxAccessGrpcRequestValidator(),
new MxAccessGrpcMapper(),
new FakeEventStreamService(sessionManager),
NullLogger<MxAccessGatewayService>.Instance);
}
@@ -275,6 +276,11 @@ public sealed class MxAccessGatewayServiceTests
public List<WorkerEvent> Events { get; } = [];
public void RecordReadEventsSessionId(string sessionId)
{
LastReadEventsSessionId = sessionId;
}
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
@@ -343,6 +349,27 @@ public sealed class MxAccessGatewayServiceTests
}
}
private sealed class FakeEventStreamService(FakeSessionManager sessionManager) : IEventStreamService
{
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
sessionManager.RecordReadEventsSessionId(request.SessionId);
foreach (WorkerEvent workerEvent in sessionManager.Events)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
if (workerEvent.Event.WorkerSequence <= request.AfterWorkerSequence)
{
continue;
}
yield return workerEvent.Event;
}
}
}
private sealed class FakeWorkerClient(int processId) : IWorkerClient
{
public string SessionId { get; } = "session-1";
@@ -109,6 +109,32 @@ public sealed class WorkerClientTests
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
}
[Fact]
public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
EventChannelCapacity = 1,
HeartbeatGrace = TimeSpan.FromSeconds(30),
HeartbeatCheckInterval = TimeSpan.FromSeconds(30),
});
await CompleteHandshakeAsync(client, pipePair);
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
{