feat(sessions): route event streaming through SessionEventDistributor

This commit is contained in:
Joseph Doherty
2026-06-15 13:18:28 -04:00
parent c2c518862f
commit 7f1018bac1
6 changed files with 288 additions and 131 deletions
@@ -1,5 +1,4 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
@@ -13,14 +12,33 @@ namespace ZB.MOM.WW.MxGateway.Server.Grpc;
public sealed class EventStreamService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
MxAccessGrpcMapper mapper,
GatewayMetrics metrics,
IDashboardEventBroadcaster dashboardEventBroadcaster,
ILogger<EventStreamService> logger) : IEventStreamService
{
/// <summary>
/// Streams events from a session to the client asynchronously.
/// Streams events from a session to the client asynchronously.
/// </summary>
/// <remarks>
/// <para>
/// Task 4 rewired this from a per-RPC channel that drained the session directly
/// to reading the subscriber's lease channel fed by the session's single
/// <see cref="SessionEventDistributor"/> pump. The pump owns the single drain of
/// the worker event stream and the worker→public mapping (mirroring the former
/// <c>ProduceEventsAsync</c>); this loop is the per-subscriber boundary that
/// applies the per-RPC filter (<c>AfterWorkerSequence</c>), the dashboard mirror,
/// queue-depth metrics, and the backpressure/overflow policy.
/// </para>
/// <para>
/// Overflow detection: the distributor's per-subscriber channel is bounded and the
/// pump drops (does not block) on a full channel. Worker sequences are contiguous
/// and the pump preserves order, so a gap between consecutive delivered
/// <see cref="MxEvent.WorkerSequence"/> values means the pump dropped events for
/// this slow subscriber — that is the overflow signal that, before Task 4, was a
/// full per-RPC channel. The FailFast / DisconnectSubscriber semantics are
/// unchanged. Task 5 takes over the per-subscriber isolation policy.
/// </para>
/// </remarks>
/// <param name="request">Stream events request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Async enumerable of MX events.</returns>
@@ -35,102 +53,62 @@ public sealed class EventStreamService(
$"Session {request.SessionId} was not found.");
}
using IDisposable subscriber = session.AttachEventSubscriber(
using IEventSubscriberLease subscriber = session.AttachEventSubscriber(
options.Value.Sessions.AllowMultipleEventSubscribers);
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
int streamQueueDepth = 0;
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
Task producerTask = ProduceEventsAsync(
session,
request.AfterWorkerSequence,
eventQueue.Writer,
() =>
{
Interlocked.Increment(ref streamQueueDepth);
metrics.AdjustGrpcEventStreamQueueDepth(1);
},
streamCts.Token);
ulong afterWorkerSequence = request.AfterWorkerSequence;
IAsyncEnumerator<MxEvent> reader = subscriber.Reader
.ReadAllAsync(cancellationToken)
.GetAsyncEnumerator(cancellationToken);
try
{
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (true)
{
Interlocked.Decrement(ref streamQueueDepth);
metrics.AdjustGrpcEventStreamQueueDepth(-1);
yield return mxEvent;
}
MxEvent mxEvent;
try
{
if (!await reader.MoveNextAsync().ConfigureAwait(false))
{
break;
}
await producerTask.ConfigureAwait(false);
}
finally
{
await streamCts.CancelAsync().ConfigureAwait(false);
subscriber.Dispose();
mxEvent = reader.Current;
}
catch (WorkerClientException workerException)
{
// The distributor pump completes every subscriber channel with the source
// fault when the worker event stream terminates abnormally; that surfaces
// here. Mirror the pre-Task-4 ProduceEventsAsync behavior: fault the
// session and record the metric, then propagate the terminal fault to the
// gRPC client.
session.MarkFaulted(workerException.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
throw;
}
try
{
await producerTask.ConfigureAwait(false);
}
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
{
}
catch (Exception exception)
{
logger.LogDebug(
exception,
"Event stream producer stopped for session {SessionId}.",
request.SessionId);
}
int remainingDepth = Interlocked.Exchange(ref streamQueueDepth, 0);
if (remainingDepth > 0)
{
metrics.AdjustGrpcEventStreamQueueDepth(-remainingDepth);
}
metrics.StreamDisconnected("Detached");
}
}
private async Task ProduceEventsAsync(
GatewaySession session,
ulong afterWorkerSequence,
ChannelWriter<MxEvent> writer,
Action eventQueued,
CancellationToken cancellationToken)
{
try
{
await foreach (WorkerEvent workerEvent in session
.ReadEventsAsync(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
MxEvent publicEvent = mapper.MapEvent(workerEvent);
if (publicEvent.WorkerSequence <= afterWorkerSequence)
// Per-RPC filter stays at the subscriber boundary: each request may resume
// from a different AfterWorkerSequence, so the shared pump fans raw events and
// this loop drops the ones at or below the caller's watermark. Filtered events
// are not mirrored to the dashboard, matching the pre-Task-4 ordering where
// the skip ran before the dashboard Publish.
if (mxEvent.WorkerSequence <= afterWorkerSequence)
{
continue;
}
// Mirror the event to the dashboard EventsHub group for this
// session. Fire-and-forget — broadcast errors must not affect
// the source gRPC stream. Server-041: the
// IDashboardEventBroadcaster contract documents Publish as
// never-throw, but we enforce that at the seam too, so a
// future implementation that adds synchronous validation or
// a serializer hop cannot fault the producer loop and end
// this client's gRPC stream.
// Mirror the event to the dashboard EventsHub group for this session.
// Fire-and-forget — broadcast errors must not affect the source gRPC stream.
// Server-041: IDashboardEventBroadcaster documents Publish as never-throw,
// but we enforce that at the seam too so a future implementation that adds
// synchronous validation or a serializer hop cannot fault this loop and end
// the client's gRPC stream. (Task 6 will move this tap onto its own
// distributor subscriber; for Task 4 it coexists here, firing once per
// delivered event for the single subscriber exactly as before.)
try
{
dashboardEventBroadcaster.Publish(session.SessionId, publicEvent);
dashboardEventBroadcaster.Publish(session.SessionId, mxEvent);
}
catch (Exception ex)
{
@@ -140,46 +118,34 @@ public sealed class EventStreamService(
session.SessionId);
}
if (!writer.TryWrite(publicEvent))
// Queue-depth gauge tracks events the pump has fanned into this subscriber's
// channel but the client has not yet consumed — the same "buffered, not yet
// delivered" quantity the pre-Task-4 per-RPC channel reported. The bounded
// subscriber channel supports counting, so reconcile the gauge to the current
// backlog; falling back to a no-op delta if a channel ever cannot count.
int backlog = subscriber.Reader.CanCount ? subscriber.Reader.Count : streamQueueDepth;
int delta = backlog - streamQueueDepth;
if (delta != 0)
{
string message = $"Session {session.SessionId} event stream queue overflowed.";
metrics.QueueOverflow("grpc-event-stream");
if (options.Value.Events.BackpressurePolicy == EventBackpressurePolicy.FailFast)
{
session.MarkFaulted(message);
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
}
else
{
logger.LogDebug(
"Disconnecting event stream for session {SessionId} after queue overflow.",
session.SessionId);
}
writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
message));
return;
streamQueueDepth = backlog;
metrics.AdjustGrpcEventStreamQueueDepth(delta);
}
eventQueued();
yield return mxEvent;
}
}
finally
{
await reader.DisposeAsync().ConfigureAwait(false);
subscriber.Dispose();
writer.TryComplete();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
writer.TryComplete();
}
catch (Exception exception)
{
if (exception is WorkerClientException)
if (streamQueueDepth != 0)
{
session.MarkFaulted(exception.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
metrics.AdjustGrpcEventStreamQueueDepth(-streamQueueDepth);
streamQueueDepth = 0;
}
writer.TryComplete(exception);
metrics.StreamDisconnected("Detached");
}
}
}