using System.Runtime.CompilerServices; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; namespace ZB.MOM.WW.MxGateway.Server.Grpc; public sealed class EventStreamService( ISessionManager sessionManager, IOptions options, GatewayMetrics metrics) : IEventStreamService { /// /// Streams events from a session to the client asynchronously. /// /// /// /// Task 4 rewired this from a per-RPC channel that drained the session directly /// to reading the subscriber's lease channel fed by the session's single /// pump. The pump owns the single drain of /// the worker event stream and the worker→public mapping (mirroring the former /// ProduceEventsAsync); this loop is the per-subscriber boundary that /// applies the per-RPC filter (AfterWorkerSequence), queue-depth metrics, /// and the backpressure/overflow policy. /// /// /// Task 6 moved the dashboard mirror OFF this per-RPC loop. The dashboard is now a /// first-class internal subscriber on the session's /// (see GatewaySession.StartDashboardMirror), /// so it receives session events even when no gRPC client is streaming. This loop no /// longer mirrors to the dashboard. One deliberate consequence: the dashboard now sees /// RAW session events, not the per-gRPC-subscriber AfterWorkerSequence-filtered /// view this loop applies — the dashboard is a separate LDAP-authenticated monitoring /// view that should see the session's full event activity (per-session dashboard ACL is /// the separate Task 18). /// /// /// Overflow handling (Task 5): the distributor's per-subscriber channel is bounded /// and the pump writes non-blocking. When this subscriber's channel is full the pump /// applies the per-subscriber backpressure policy and completes this subscriber's /// channel with a /// (). That terminal fault /// surfaces here when the reader's MoveNextAsync throws, and — like the /// pre-epic per-RPC overflow — it propagates to the gRPC client unchanged. The /// overflow metric, and (in the legacy single-subscriber FailFast case) the session /// fault + fault metric, are recorded by the distributor's overflow handler so the /// session, the pump, and other subscribers are isolated from this subscriber's /// slowness. /// /// /// Stream events request. /// Cancellation token. /// Async enumerable of MX events. public async IAsyncEnumerable StreamEventsAsync( StreamEventsRequest request, [EnumeratorCancellation] CancellationToken cancellationToken) { if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession? session) || session is null) { throw new SessionManagerException( SessionManagerErrorCode.SessionNotFound, $"Session {request.SessionId} was not found."); } // No `using` here — subscriber.Dispose() is called exactly once in the finally // block below, which also disposes the reader. A `using` declaration would add a // second Dispose on the same path and double-decrement the session subscriber count. // The subscriber mode (single vs. multi) is derived inside AttachEventSubscriber from // the session's own SessionEventStreaming.AllowMultipleEventSubscribers field — the // same source the distributor uses — so the two cannot diverge. // // Reconnect/resume (Task 12): when AfterWorkerSequence > 0 the client is resuming, so // attach via the replay variant that atomically snapshots the replay ring AND registers // the live subscriber under one lock. That single critical section is the crux of the // no-gap/no-duplicate handoff: every replayed event has sequence <= LiveResumeSequence // and every live event delivered below is filtered to sequence > LiveResumeSequence, so // an event that was both replayed and (racing the registration) fanned into the live // channel is dropped exactly once, while no newer event is skipped. See // SessionEventDistributor.RegisterWithReplay for the full argument. // // AfterWorkerSequence == 0 (fresh stream, not a resume) keeps the pre-Task-12 behavior: // a plain attach, no replay, no sentinel, and the live filter watermark stays 0. ulong afterWorkerSequence = request.AfterWorkerSequence; IEventSubscriberLease subscriber; IReadOnlyList replayedEvents = []; bool replayGap = false; ulong oldestAvailableSequence = 0; if (afterWorkerSequence > 0) { EventSubscriberReplayAttachment attachment = session.AttachEventSubscriberWithReplay( options.Value.Sessions.MaxEventSubscribersPerSession, afterWorkerSequence); subscriber = attachment.Lease; replayedEvents = attachment.ReplayedEvents; replayGap = attachment.Gap; oldestAvailableSequence = attachment.OldestAvailableSequence; // The live filter resumes strictly after the last replayed sequence (or, when // nothing was replayed, after the requested watermark). This is what makes the // handoff free of duplicates: anything <= this watermark was already replayed. afterWorkerSequence = attachment.LiveResumeSequence; } else { subscriber = session.AttachEventSubscriber( options.Value.Sessions.MaxEventSubscribersPerSession); } int streamQueueDepth = 0; IAsyncEnumerator reader = subscriber.Reader .ReadAllAsync(cancellationToken) .GetAsyncEnumerator(cancellationToken); try { // Emit order for a resume: the ReplayGap sentinel FIRST (only when events were // evicted), then the still-retained replay batch, then live. The sentinel is an // explicit documented control signal (not a synthesized MXAccess event) and is // delivered ONLY to this resuming subscriber — it is never fanned to other // subscribers and never appears in DrainEventsReply (that path is untouched). if (replayGap) { yield return CreateReplayGapSentinel( request.SessionId, request.AfterWorkerSequence, oldestAvailableSequence); } foreach (MxEvent replayedEvent in replayedEvents) { // Replayed events pass through the SAME per-item filter the live loop applies, // so a constrained/resuming caller never sees a replayed event it could not // have seen live. The watermark dropped events at/below the requested // AfterWorkerSequence; the snapshot already excluded those, but this keeps the // filter identical for replay and live. if (replayedEvent.WorkerSequence <= request.AfterWorkerSequence) { continue; } yield return replayedEvent; } while (true) { MxEvent mxEvent; try { if (!await reader.MoveNextAsync().ConfigureAwait(false)) { break; } mxEvent = reader.Current; } catch (WorkerClientException workerException) { // The distributor pump completes every subscriber channel with the source // fault when the worker event stream terminates abnormally; that surfaces // here. Mirror the pre-Task-4 ProduceEventsAsync behavior: fault the // session and record the metric, then propagate the terminal fault to the // gRPC client. session.MarkFaulted(workerException.Message); metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString()); throw; } // Per-RPC filter stays at the subscriber boundary: each request may resume // from a different AfterWorkerSequence, so the shared pump fans raw events and // this loop drops the ones at or below the caller's watermark. if (mxEvent.WorkerSequence <= afterWorkerSequence) { continue; } // Queue-depth gauge tracks events the pump has fanned into this subscriber's // channel but the client has not yet consumed — the same "buffered, not yet // delivered" quantity the pre-Task-4 per-RPC channel reported. The bounded // subscriber channel supports counting, so reconcile the gauge to the current // backlog; falling back to a no-op delta if a channel ever cannot count. int backlog = subscriber.Reader.CanCount ? subscriber.Reader.Count : streamQueueDepth; int delta = backlog - streamQueueDepth; if (delta != 0) { streamQueueDepth = backlog; metrics.AdjustGrpcEventStreamQueueDepth(delta); } yield return mxEvent; } } finally { await reader.DisposeAsync().ConfigureAwait(false); subscriber.Dispose(); if (streamQueueDepth != 0) { metrics.AdjustGrpcEventStreamQueueDepth(-streamQueueDepth); streamQueueDepth = 0; } metrics.StreamDisconnected("Detached"); } } // Builds the single ReplayGap control sentinel emitted at the head of a resumed // StreamEvents stream when the requested AfterWorkerSequence predates the oldest event // still retained (events were evicted). Per the proto contract (MxEvent.replay_gap), // the sentinel carries the session id and the populated ReplayGap, with family // UNSPECIFIED, no body, and no per-item fields. It is a documented control signal — NOT a // synthesized MXAccess event — so emitting it does not violate the no-synthesis rule. private static MxEvent CreateReplayGapSentinel( string sessionId, ulong requestedAfterSequence, ulong oldestAvailableSequence) => new() { SessionId = sessionId, ReplayGap = new ReplayGap { RequestedAfterSequence = requestedAfterSequence, OldestAvailableSequence = oldestAvailableSequence, }, }; }