diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 33b2b7e..bab078c 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -206,13 +206,23 @@ accounting and a clear fan-out policy. Behavior: 1. Validate session id and authorize event access. -2. Attach a stream cursor to the session event channel. -3. Send events in worker sequence order. -4. Stop on client cancellation, session close, or session fault. -5. Emit a terminal status when the session faults if gRPC status alone cannot +2. Attach the single active subscriber lease for the session. +3. Read worker events into a bounded public stream queue. +4. Send events in worker sequence order. +5. Stop on client cancellation, session close, or session fault. +6. Emit a terminal status when the session faults if gRPC status alone cannot preserve the required details. -The gateway must not reorder events from one worker. +`EventStreamService` owns subscriber tracking and public stream backpressure. +The default policy allows one active subscriber per session. A second subscriber +is rejected with `EventSubscriberAlreadyActive`. Stream cancellation releases +the subscriber lease so a later stream can attach to the session. + +The gateway must not reorder events from one worker. `EventStreamService` writes +mapped events to a bounded first-in, first-out queue and faults the session with +`EventQueueOverflow` if the queue fills. The gateway does not synthesize +`OperationComplete`; it forwards that family only when the worker reports a +native MXAccess `OperationComplete` event. ## Web Dashboard @@ -584,7 +594,8 @@ worker MXAccess event -> worker outbound event queue -> worker pipe writer -> gateway read loop - -> session event channel + -> worker client event queue + -> EventStreamService bounded stream queue -> gRPC StreamEvents ``` @@ -598,13 +609,15 @@ The gateway should record: Default backpressure policy for parity testing should be fail-fast: -1. If the session event channel fills, fault the session. +1. If the worker client event queue fills, fault the worker client. +2. If the public stream queue fills, fault the gateway session. 2. Preserve the overflow details in logs and metrics. 3. Do not silently drop data-change events. -Do not set a production event-rate target before measurement. Emit event rate, -queue depth, stream send latency, and overflow metrics. Later production modes -may support explicit coalescing by item handle as an opt-in behavior. +Do not set a production event-rate target before measurement. `GatewayMetrics` +records received event counts by family, queue depth, stream disconnects, and +overflow counts. Later production modes may support explicit coalescing by item +handle as an opt-in behavior. The gateway should not synthesize `OperationComplete` from write completion, command replies, ASB completion queues, or completion-only status frames. Forward diff --git a/gateway.md b/gateway.md index 2dfcbc2..db6ae35 100644 --- a/gateway.md +++ b/gateway.md @@ -520,11 +520,7 @@ Worker policy: - bounded outbound event channel, - never block MXAccess event handler on pipe writes, -- if the outbound channel is full, apply configured policy: - - disconnect session, - - drop oldest low-priority data-change events, - - coalesce data changes by item handle, - - or block briefly then fault. +- fail the worker session when the outbound channel is full. For full parity testing, default should be fail-fast rather than silent drop. For production high-rate telemetry, add explicit coalescing modes. @@ -533,9 +529,15 @@ Gateway policy: - one event sequencer per session, - preserve per-session event order, -- support multiple client event subscribers only if explicitly required, -- apply backpressure from slow gRPC streams, -- disconnect or coalesce according to client-selected mode. +- allow one active client event subscriber per session, +- reject a second subscriber with a clear session error, +- use a bounded `EventStreamService` queue between worker events and gRPC + writes, +- fault the session when the bounded stream queue overflows, +- detach the subscriber when the stream is canceled. + +The gateway forwards only events reported by the worker. It does not synthesize +`OperationComplete` from write completion, command replies, or status frames. ## Isolation And Fault Handling @@ -857,10 +859,11 @@ translation code testable. The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by validating public requests, delegating session work to `ISessionManager`, and -using explicit mapper code for public-to-worker commands, worker replies, and -events. Missing sessions and transport failures return gRPC status errors; -worker command replies preserve MXAccess HRESULT and status details in the -public reply. +using explicit mapper code for public-to-worker commands and worker replies. +`StreamEvents` delegates subscriber ownership, ordering, and backpressure to +`EventStreamService`. Missing sessions and transport failures return gRPC +status errors; worker command replies preserve MXAccess HRESULT and status +details in the public reply. ## C# Worker Versus C++ Worker diff --git a/src/MxGateway.Server/GatewayApplication.cs b/src/MxGateway.Server/GatewayApplication.cs index 887cdcf..15b2b3b 100644 --- a/src/MxGateway.Server/GatewayApplication.cs +++ b/src/MxGateway.Server/GatewayApplication.cs @@ -35,6 +35,7 @@ public static class GatewayApplication builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddWorkerProcessLauncher(); builder.Services.AddGatewaySessions(); builder.Services.AddGatewayDashboard(); diff --git a/src/MxGateway.Server/Grpc/EventStreamService.cs b/src/MxGateway.Server/Grpc/EventStreamService.cs new file mode 100644 index 0000000..1aacd39 --- /dev/null +++ b/src/MxGateway.Server/Grpc/EventStreamService.cs @@ -0,0 +1,140 @@ +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Microsoft.Extensions.Options; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Grpc; + +public sealed class EventStreamService( + ISessionManager sessionManager, + IOptions options, + MxAccessGrpcMapper mapper, + GatewayMetrics metrics, + ILogger logger) : IEventStreamService +{ + public async IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession session)) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotFound, + $"Session {request.SessionId} was not found."); + } + + using IDisposable subscriber = session.AttachEventSubscriber( + options.Value.Sessions.AllowMultipleEventSubscribers); + using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + int streamQueueDepth = 0; + Channel eventQueue = Channel.CreateBounded( + new BoundedChannelOptions(options.Value.Events.QueueCapacity) + { + SingleReader = true, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait, + AllowSynchronousContinuations = false, + }); + Task producerTask = ProduceEventsAsync( + session, + request.AfterWorkerSequence, + eventQueue.Writer, + () => + { + int depth = Interlocked.Increment(ref streamQueueDepth); + metrics.SetEventQueueDepth(depth); + }, + streamCts.Token); + + try + { + await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + { + int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth)); + metrics.SetEventQueueDepth(depth); + yield return mxEvent; + } + + await producerTask.ConfigureAwait(false); + } + finally + { + await streamCts.CancelAsync().ConfigureAwait(false); + subscriber.Dispose(); + metrics.StreamDisconnected("Detached"); + + try + { + await producerTask.ConfigureAwait(false); + } + catch (OperationCanceledException) when (streamCts.IsCancellationRequested) + { + } + catch (Exception exception) + { + logger.LogDebug( + exception, + "Event stream producer stopped for session {SessionId}.", + request.SessionId); + } + } + } + + private async Task ProduceEventsAsync( + GatewaySession session, + ulong afterWorkerSequence, + ChannelWriter writer, + Action eventQueued, + CancellationToken cancellationToken) + { + try + { + await foreach (WorkerEvent workerEvent in session + .ReadEventsAsync(cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + MxEvent publicEvent = mapper.MapEvent(workerEvent); + if (publicEvent.WorkerSequence <= afterWorkerSequence) + { + continue; + } + + if (!writer.TryWrite(publicEvent)) + { + string message = $"Session {session.SessionId} event stream queue overflowed."; + session.MarkFaulted(message); + metrics.QueueOverflow("grpc-event-stream"); + metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString()); + writer.TryComplete(new SessionManagerException( + SessionManagerErrorCode.EventQueueOverflow, + message)); + return; + } + + eventQueued(); + } + + writer.TryComplete(); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + writer.TryComplete(); + } + catch (Exception exception) + { + if (exception is WorkerClientException) + { + session.MarkFaulted(exception.Message); + metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString()); + } + + writer.TryComplete(exception); + } + } +} diff --git a/src/MxGateway.Server/Grpc/IEventStreamService.cs b/src/MxGateway.Server/Grpc/IEventStreamService.cs new file mode 100644 index 0000000..06f4acb --- /dev/null +++ b/src/MxGateway.Server/Grpc/IEventStreamService.cs @@ -0,0 +1,10 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Grpc; + +public interface IEventStreamService +{ + IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CancellationToken cancellationToken); +} diff --git a/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs b/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs index 1ee8777..2b75baf 100644 --- a/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs +++ b/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs @@ -12,6 +12,7 @@ public sealed class MxAccessGatewayService( IGatewayRequestIdentityAccessor identityAccessor, MxAccessGrpcRequestValidator requestValidator, MxAccessGrpcMapper mapper, + IEventStreamService eventStreamService, ILogger logger) : MxAccessGateway.MxAccessGatewayBase { public override async Task OpenSession( @@ -102,17 +103,11 @@ public sealed class MxAccessGatewayService( try { requestValidator.ValidateStreamEvents(request); - await foreach (WorkerEvent workerEvent in sessionManager - .ReadEventsAsync(request.SessionId, context.CancellationToken) + await foreach (MxEvent publicEvent in eventStreamService + .StreamEventsAsync(request, context.CancellationToken) .WithCancellation(context.CancellationToken) .ConfigureAwait(false)) { - MxEvent publicEvent = mapper.MapEvent(workerEvent); - if (publicEvent.WorkerSequence <= request.AfterWorkerSequence) - { - continue; - } - await responseStream.WriteAsync(publicEvent).ConfigureAwait(false); } } @@ -154,6 +149,8 @@ public sealed class MxAccessGatewayService( { SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound, SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition, + SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted, + SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted, SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted, SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable, SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable, diff --git a/src/MxGateway.Server/Sessions/GatewaySession.cs b/src/MxGateway.Server/Sessions/GatewaySession.cs index d010669..dab1f34 100644 --- a/src/MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/MxGateway.Server/Sessions/GatewaySession.cs @@ -13,6 +13,7 @@ public sealed class GatewaySession private DateTimeOffset _lastClientActivityAt; private DateTimeOffset? _leaseExpiresAt; private bool _closeStarted; + private int _activeEventSubscriberCount; public GatewaySession( string sessionId, @@ -131,6 +132,17 @@ public sealed class GatewaySession } } + public int ActiveEventSubscriberCount + { + get + { + lock (_syncRoot) + { + return _activeEventSubscriberCount; + } + } + } + public void AttachWorkerClient(IWorkerClient workerClient) { ArgumentNullException.ThrowIfNull(workerClient); @@ -202,6 +214,29 @@ public sealed class GatewaySession } } + public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers) + { + lock (_syncRoot) + { + if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotReady, + $"Session {SessionId} is not ready for event streaming. Current state is {_state}."); + } + + if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0) + { + throw new SessionManagerException( + SessionManagerErrorCode.EventSubscriberAlreadyActive, + $"Session {SessionId} already has an active event stream subscriber."); + } + + _activeEventSubscriberCount++; + return new EventSubscriberLease(this); + } + } + public async Task InvokeAsync( WorkerCommand command, CancellationToken cancellationToken) @@ -287,4 +322,31 @@ public sealed class GatewaySession return _workerClient; } } + + private void DetachEventSubscriber() + { + lock (_syncRoot) + { + if (_activeEventSubscriberCount > 0) + { + _activeEventSubscriberCount--; + } + } + } + + private sealed class EventSubscriberLease(GatewaySession session) : IDisposable + { + private bool _disposed; + + public void Dispose() + { + if (_disposed) + { + return; + } + + session.DetachEventSubscriber(); + _disposed = true; + } + } } diff --git a/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs b/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs index dcbca45..f7e723c 100644 --- a/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs +++ b/src/MxGateway.Server/Sessions/SessionManagerErrorCode.cs @@ -4,6 +4,8 @@ public enum SessionManagerErrorCode { SessionNotFound, SessionNotReady, + EventSubscriberAlreadyActive, + EventQueueOverflow, SessionLimitExceeded, OpenFailed, CloseFailed, diff --git a/src/MxGateway.Server/Workers/WorkerClient.cs b/src/MxGateway.Server/Workers/WorkerClient.cs index 9e0daa9..18b8cb6 100644 --- a/src/MxGateway.Server/Workers/WorkerClient.cs +++ b/src/MxGateway.Server/Workers/WorkerClient.cs @@ -29,6 +29,7 @@ public sealed class WorkerClient : IWorkerClient private WorkerClientState _state; private DateTimeOffset _lastHeartbeatAt; private int? _processId; + private int _eventQueueDepth; private Task? _readLoopTask; private Task? _writeLoopTask; private Task? _heartbeatLoopTask; @@ -197,6 +198,8 @@ public sealed class WorkerClient : IWorkerClient { await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { + int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth)); + _metrics?.SetEventQueueDepth(queueDepth); yield return workerEvent; } } @@ -394,11 +397,6 @@ public sealed class WorkerClient : IWorkerClient _metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString()); } - if (!await _events.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false)) - { - return; - } - if (!_events.Writer.TryWrite(workerEvent)) { _metrics?.QueueOverflow("worker-events"); @@ -406,7 +404,11 @@ public sealed class WorkerClient : IWorkerClient WorkerClientErrorCode.ProtocolViolation, "Worker event channel rejected an event.", null); + return; } + + int queueDepth = Interlocked.Increment(ref _eventQueueDepth); + _metrics?.SetEventQueueDepth(queueDepth); } private void CompleteCommand(WorkerEnvelope envelope) diff --git a/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs new file mode 100644 index 0000000..4960895 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs @@ -0,0 +1,383 @@ +using System.Runtime.CompilerServices; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Grpc; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Grpc; + +public sealed class EventStreamServiceTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); + + [Fact] + public async Task StreamEventsAsync_YieldsEventsInWorkerOrder() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + FakeSessionManager sessionManager = new(session); + using GatewayMetrics metrics = new(); + EventStreamService service = CreateService(sessionManager, metrics: metrics); + workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(sequence: 11, MxEventFamily.OnWriteComplete)); + workerClient.CompleteAfterConfiguredEvents = true; + + List events = await CollectEventsAsync(service, session.SessionId); + + Assert.Equal([10UL, 11UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); + Assert.Equal(MxEventFamily.OnDataChange, events[0].Family); + Assert.Equal(MxEventFamily.OnWriteComplete, events[1].Family); + Assert.Equal(1, metrics.GetSnapshot().StreamDisconnects); + } + + [Fact] + public async Task StreamEventsAsync_WhenSecondSubscriberStarts_RejectsClearly() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + using CancellationTokenSource firstSubscriberCancellation = new(); + await using IAsyncEnumerator firstSubscriber = service + .StreamEventsAsync(CreateRequest(session.SessionId), firstSubscriberCancellation.Token) + .GetAsyncEnumerator(firstSubscriberCancellation.Token); + Task firstMoveTask = firstSubscriber.MoveNextAsync().AsTask(); + + await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1); + await using IAsyncEnumerator secondSubscriber = service + .StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None) + .GetAsyncEnumerator(); + + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await secondSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + + Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode); + await firstSubscriberCancellation.CancelAsync(); + await Assert.ThrowsAnyAsync( + async () => await firstMoveTask.WaitAsync(TestTimeout)); + await firstSubscriber.DisposeAsync(); + await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0); + } + + [Fact] + public async Task StreamEventsAsync_WhenCanceled_DetachesSubscriber() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + using CancellationTokenSource cancellationTokenSource = new(); + await using IAsyncEnumerator subscriber = service + .StreamEventsAsync(CreateRequest(session.SessionId), cancellationTokenSource.Token) + .GetAsyncEnumerator(cancellationTokenSource.Token); + Task moveTask = subscriber.MoveNextAsync().AsTask(); + + await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1); + await cancellationTokenSource.CancelAsync(); + await Assert.ThrowsAnyAsync( + async () => await moveTask.WaitAsync(TestTimeout)); + await subscriber.DisposeAsync(); + + await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0); + } + + [Fact] + public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + using GatewayMetrics metrics = new(); + EventStreamService service = CreateService( + new FakeSessionManager(session), + metrics, + queueCapacity: 1); + workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange)); + workerClient.CompleteAfterConfiguredEvents = true; + await using IAsyncEnumerator subscriber = service + .StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None) + .GetAsyncEnumerator(); + + Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + await WaitUntilAsync(() => session.State == SessionState.Faulted); + SessionManagerException exception = await Assert.ThrowsAsync( + async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + + Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode); + Assert.Equal(SessionState.Faulted, session.State); + Assert.Equal(1, metrics.GetSnapshot().QueueOverflows); + Assert.Equal(1, metrics.GetSnapshot().Faults); + } + + [Fact] + public async Task StreamEventsAsync_DoesNotSynthesizeOperationComplete() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnWriteComplete)); + workerClient.CompleteAfterConfiguredEvents = true; + + List events = await CollectEventsAsync(service, session.SessionId); + + MxEvent mxEvent = Assert.Single(events); + Assert.Equal(MxEventFamily.OnWriteComplete, mxEvent.Family); + Assert.DoesNotContain(events, candidate => candidate.Family == MxEventFamily.OperationComplete); + } + + [Fact] + public async Task StreamEventsAsync_WhenWorkerEventStreamFaults_PropagatesTerminalFault() + { + FakeWorkerClient workerClient = new() + { + TerminalException = new WorkerClientException( + WorkerClientErrorCode.WorkerFaulted, + "worker terminal fault"), + }; + GatewaySession session = CreateReadySession(workerClient); + using GatewayMetrics metrics = new(); + EventStreamService service = CreateService(new FakeSessionManager(session), metrics); + await using IAsyncEnumerator subscriber = service + .StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None) + .GetAsyncEnumerator(); + + WorkerClientException exception = await Assert.ThrowsAsync( + async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + + Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode); + Assert.Equal(SessionState.Faulted, session.State); + Assert.Equal(1, metrics.GetSnapshot().Faults); + } + + private static EventStreamService CreateService( + FakeSessionManager sessionManager, + GatewayMetrics? metrics = null, + int queueCapacity = 8) + { + return new EventStreamService( + sessionManager, + Options.Create(new GatewayOptions + { + Events = new EventOptions + { + QueueCapacity = queueCapacity, + }, + }), + new MxAccessGrpcMapper(), + metrics ?? new GatewayMetrics(), + NullLogger.Instance); + } + + private static async Task> CollectEventsAsync( + EventStreamService service, + string sessionId) + { + List events = []; + await foreach (MxEvent mxEvent in service + .StreamEventsAsync(CreateRequest(sessionId), CancellationToken.None) + .WithCancellation(CancellationToken.None)) + { + events.Add(mxEvent); + } + + return events; + } + + private static StreamEventsRequest CreateRequest(string sessionId) + { + return new StreamEventsRequest + { + SessionId = sessionId, + }; + } + + private static GatewaySession CreateReadySession(FakeWorkerClient workerClient) + { + GatewaySession session = new( + "session-events", + GatewayContractInfo.DefaultBackendName, + "pipe", + "nonce", + "client", + "client-session", + "client-correlation", + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(10), + DateTimeOffset.UtcNow); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + + return session; + } + + private static WorkerEvent CreateWorkerEvent( + ulong sequence, + MxEventFamily family) + { + MxEvent mxEvent = new() + { + SessionId = "session-events", + Family = family, + WorkerSequence = sequence, + }; + + switch (family) + { + case MxEventFamily.OnDataChange: + mxEvent.OnDataChange = new OnDataChangeEvent(); + break; + case MxEventFamily.OnWriteComplete: + mxEvent.OnWriteComplete = new OnWriteCompleteEvent(); + break; + case MxEventFamily.OperationComplete: + mxEvent.OperationComplete = new OperationCompleteEvent(); + break; + case MxEventFamily.OnBufferedDataChange: + mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent(); + break; + } + + return new WorkerEvent + { + Event = mxEvent, + }; + } + + private static async Task WaitUntilAsync(Func predicate) + { + using CancellationTokenSource cancellationTokenSource = new(TestTimeout); + while (!predicate()) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); + } + } + + private sealed class FakeSessionManager(GatewaySession session) : ISessionManager + { + public Task OpenSessionAsync( + SessionOpenRequest request, + string? clientIdentity, + CancellationToken cancellationToken) + { + return Task.FromResult(session); + } + + public bool TryGetSession( + string sessionId, + out GatewaySession gatewaySession) + { + gatewaySession = session; + return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal); + } + + public Task InvokeAsync( + string sessionId, + WorkerCommand command, + CancellationToken cancellationToken) + { + return Task.FromResult(new WorkerCommandReply()); + } + + public IAsyncEnumerable ReadEventsAsync( + string sessionId, + CancellationToken cancellationToken) + { + return session.ReadEventsAsync(cancellationToken); + } + + public Task CloseSessionAsync( + string sessionId, + CancellationToken cancellationToken) + { + return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); + } + + public Task CloseExpiredLeasesAsync( + DateTimeOffset now, + CancellationToken cancellationToken) + { + return Task.FromResult(0); + } + + public Task ShutdownAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } + + private sealed class FakeWorkerClient : IWorkerClient + { + public List Events { get; } = []; + + public bool CompleteAfterConfiguredEvents { get; set; } + + public Exception? TerminalException { get; init; } + + public string SessionId { get; } = "session-events"; + + public int? ProcessId { get; } = 4321; + + public WorkerClientState State { get; private set; } = WorkerClientState.Ready; + + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task InvokeAsync( + WorkerCommand command, + TimeSpan timeout, + CancellationToken cancellationToken) + { + return Task.FromResult(new WorkerCommandReply()); + } + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + foreach (WorkerEvent workerEvent in Events) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return workerEvent; + } + + if (TerminalException is not null) + { + throw TerminalException; + } + + if (CompleteAfterConfiguredEvents) + { + yield break; + } + + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken); + } + + public Task ShutdownAsync( + TimeSpan timeout, + CancellationToken cancellationToken) + { + State = WorkerClientState.Closed; + return Task.CompletedTask; + } + + public void Kill(string reason) + { + State = WorkerClientState.Faulted; + } + + public ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + } +} diff --git a/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs b/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs index 85ce3fe..0d7d605 100644 --- a/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs +++ b/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs @@ -184,6 +184,7 @@ public sealed class MxAccessGatewayServiceTests identityAccessor ?? new GatewayRequestIdentityAccessor(), new MxAccessGrpcRequestValidator(), new MxAccessGrpcMapper(), + new FakeEventStreamService(sessionManager), NullLogger.Instance); } @@ -275,6 +276,11 @@ public sealed class MxAccessGatewayServiceTests public List Events { get; } = []; + public void RecordReadEventsSessionId(string sessionId) + { + LastReadEventsSessionId = sessionId; + } + public Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, @@ -343,6 +349,27 @@ public sealed class MxAccessGatewayServiceTests } } + private sealed class FakeEventStreamService(FakeSessionManager sessionManager) : IEventStreamService + { + public async IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + sessionManager.RecordReadEventsSessionId(request.SessionId); + foreach (WorkerEvent workerEvent in sessionManager.Events) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + if (workerEvent.Event.WorkerSequence <= request.AfterWorkerSequence) + { + continue; + } + + yield return workerEvent.Event; + } + } + } + private sealed class FakeWorkerClient(int processId) : IWorkerClient { public string SessionId { get; } = "session-1"; diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs index c519c07..cf55511 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs @@ -109,6 +109,32 @@ public sealed class WorkerClientTests Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family); } + [Fact] + public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient() + { + await using PipePair pipePair = await PipePair.CreateAsync(); + await using WorkerClient client = CreateClient( + pipePair, + new WorkerClientOptions + { + EventChannelCapacity = 1, + HeartbeatGrace = TimeSpan.FromSeconds(30), + HeartbeatCheckInterval = TimeSpan.FromSeconds(30), + }); + await CompleteHandshakeAsync(client, pipePair); + + await pipePair.WorkerWriter.WriteAsync( + CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange)); + await pipePair.WorkerWriter.WriteAsync( + CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange)); + + await WaitUntilAsync( + () => client.State == WorkerClientState.Faulted, + TestTimeout); + + Assert.Equal(WorkerClientState.Faulted, client.State); + } + [Fact] public async Task ReadLoop_WhenPipeDisconnects_FaultsClient() {