using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Options; using MxGateway.Contracts.Proto; using MxGateway.Server.Configuration; using MxGateway.Server.Metrics; using MxGateway.Server.Sessions; using MxGateway.Server.Workers; namespace MxGateway.Server.Grpc; public sealed class EventStreamService( ISessionManager sessionManager, IOptions options, MxAccessGrpcMapper mapper, GatewayMetrics metrics, ILogger logger) : IEventStreamService { public async IAsyncEnumerable StreamEventsAsync( StreamEventsRequest request, [EnumeratorCancellation] CancellationToken cancellationToken) { if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession session)) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotFound, $"Session {request.SessionId} was not found."); } using IDisposable subscriber = session.AttachEventSubscriber( options.Value.Sessions.AllowMultipleEventSubscribers); using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); int streamQueueDepth = 0; Channel eventQueue = Channel.CreateBounded( new BoundedChannelOptions(options.Value.Events.QueueCapacity) { SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait, AllowSynchronousContinuations = false, }); Task producerTask = ProduceEventsAsync( session, request.AfterWorkerSequence, eventQueue.Writer, () => { int depth = Interlocked.Increment(ref streamQueueDepth); metrics.SetEventQueueDepth(depth); }, streamCts.Token); try { await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth)); metrics.SetEventQueueDepth(depth); yield return mxEvent; } await producerTask.ConfigureAwait(false); } finally { await streamCts.CancelAsync().ConfigureAwait(false); subscriber.Dispose(); metrics.StreamDisconnected("Detached"); try { await producerTask.ConfigureAwait(false); } catch (OperationCanceledException) when (streamCts.IsCancellationRequested) { } catch (Exception exception) { logger.LogDebug( exception, "Event stream producer stopped for session {SessionId}.", request.SessionId); } } } private async Task ProduceEventsAsync( GatewaySession session, ulong afterWorkerSequence, ChannelWriter writer, Action eventQueued, CancellationToken cancellationToken) { try { await foreach (WorkerEvent workerEvent in session .ReadEventsAsync(cancellationToken) .WithCancellation(cancellationToken) .ConfigureAwait(false)) { MxEvent publicEvent = mapper.MapEvent(workerEvent); if (publicEvent.WorkerSequence <= afterWorkerSequence) { continue; } if (!writer.TryWrite(publicEvent)) { string message = $"Session {session.SessionId} event stream queue overflowed."; session.MarkFaulted(message); metrics.QueueOverflow("grpc-event-stream"); metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString()); writer.TryComplete(new SessionManagerException( SessionManagerErrorCode.EventQueueOverflow, message)); return; } eventQueued(); } writer.TryComplete(); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { writer.TryComplete(); } catch (Exception exception) { if (exception is WorkerClientException) { session.MarkFaulted(exception.Message); metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString()); } writer.TryComplete(exception); } } }