using System.Collections.Concurrent;
using System.Threading.Channels;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
///
/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
/// and the event cannot be written. The handler applies policy side-effects only:
/// it records the overflow metric and, in the legacy single-subscriber FailFast case,
/// faults the owning session. The handler MUST NOT complete the subscriber's channel —
/// the distributor performs the disconnect and channel-completion unconditionally,
/// regardless of what the handler does.
///
///
/// when FailFast is allowed to fault the whole session for this
/// overflow. As of Task 8 this is gated on the SESSION MODE, not a live count: it is
/// only for an external subscriber in single-subscriber mode
/// (AllowMultipleEventSubscribers == false), where at most one external subscriber
/// can ever exist. In multi-subscriber mode it is always , so
/// FailFast degrades to a per-subscriber disconnect and one slow consumer never faults a
/// session shared by others; gating on the fixed mode also removes the Task 5 race where a
/// concurrent registration could make a count snapshot falsely report a sole subscriber.
/// Always for internal subscribers (the dashboard mirror) so a
/// slow/broken dashboard can never fault the session.
///
///
/// when the overflowing subscriber is the gateway-owned internal
/// dashboard mirror subscriber. The handler uses this to choose the correct metric label
/// ("dashboard-mirror" vs "grpc-event-stream").
///
public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInternal);
///
/// Per-session event pump and fan-out. A single background task drains the
/// session's event source exactly once and fans each event out to
/// every currently-registered subscriber's own bounded channel.
///
///
///
/// Introduced by Task 2 of the Session Resilience epic; the bounded replay ring
/// buffer was added by Task 3, it was wired into GatewaySession and
/// EventStreamService by Task 4, and the per-subscriber backpressure-isolation
/// policy (Task 5) is implemented here: a slow subscriber overflows only its own
/// bounded channel and the pump applies the policy to that subscriber alone (see
/// and OnSubscriberOverflow), leaving
/// the pump, the session, and other subscribers running. Task 8 made the
/// FailFast-faults-session decision mode-gated: it fires only in single-subscriber
/// mode (singleSubscriberMode), so multi-subscriber FailFast always degrades to
/// a per-subscriber disconnect — see OnSubscriberOverflow. The ring buffer supports capacity
/// eviction (oldest entry dropped when the count exceeds
/// replayBufferCapacity) and age eviction (entries older than
/// replayRetentionSeconds dropped on the next append or query), and is
/// queried via by reconnecting subscribers.
///
///
/// Source seam. The event source is injected as a
/// producing an
/// of already-mapped public
/// s, given a . This is the
/// cleanest seam for Task 4: it can pass
/// ct => session.ReadEventsAsync(ct).Select(mapper.MapEvent) (or a
/// channel reader's ReadAllAsync), while unit tests pass a plain
/// channel reader's ReadAllAsync with no real session. The pump owns the
/// single consumption of this enumerable; fan-out happens on the public
/// after mapping, mirroring today's
/// EventStreamService.ProduceEventsAsync ordering.
///
///
/// Concurrency. The subscriber set is a
/// keyed by a monotonic id.
/// The pump iterates it with a snapshot-free enumerator (which never throws on
/// concurrent add/remove), and / lease disposal mutate it
/// without any lock held across an await. Each subscriber channel has a
/// single writer — the pump — so per-channel writes never race. MXAccess parity:
/// events are fanned in the order received; the pump never reorders or
/// synthesizes events.
///
///
public sealed class SessionEventDistributor : IAsyncDisposable
{
///
/// Bounded wait for the pump to stop during disposal. A source factory that
/// ignores cancellation must not hang dispose forever; after this window the
/// pump is abandoned and subscribers are completed anyway.
///
private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5);
private readonly string _sessionId;
private readonly Func> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
private readonly bool _singleSubscriberMode;
private readonly SubscriberOverflowHandler? _overflowHandler;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
private readonly ConcurrentDictionary _subscribers = new();
private readonly CancellationTokenSource _shutdownCts = new();
private readonly object _lifecycleLock = new();
// Replay ring buffer. Appended on the pump thread and queried from arbitrary
// threads via TryGetReplayFrom, so every access is under _replayLock. The deque
// keeps events in ascending WorkerSequence order (the pump fans in source order),
// so the oldest retained event is always at the front. Capacity == 0 disables
// retention; RetentionSeconds <= 0 disables age-based eviction.
private readonly int _replayBufferCapacity;
private readonly TimeSpan _replayRetention;
private readonly bool _ageEvictionEnabled;
private readonly LinkedList _replayBuffer = new();
private readonly object _replayLock = new();
private bool _anyEventSeen;
private ulong _highestSequenceSeen;
private long _nextSubscriberId;
private Task? _pumpTask;
private bool _started;
private bool _disposed;
///
/// Initializes a per-session event distributor.
///
/// Owning session id, used only for logging context.
///
/// Factory producing the session's event stream given a cancellation token.
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
/// plugs into.
///
///
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
/// queue capacity shape used today.
///
/// Logger for pump lifecycle diagnostics.
///
/// This overload disables the replay ring buffer (capacity 0). Use the overload
/// taking replay parameters to retain events for reconnect/reattach replay.
/// Kept internal so production wiring (Task 4) cannot accidentally use
/// the no-replay path; tests reach it via InternalsVisibleTo.
///
internal SessionEventDistributor(
string sessionId,
Func> eventSourceFactory,
int subscriberQueueCapacity,
ILogger logger,
SubscriberOverflowHandler? overflowHandler = null,
bool singleSubscriberMode = true)
: this(
sessionId,
eventSourceFactory,
subscriberQueueCapacity,
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
logger,
TimeProvider.System,
overflowHandler,
singleSubscriberMode)
{
}
///
/// Initializes a per-session event distributor with a bounded replay ring buffer.
///
/// Owning session id, used only for logging context.
///
/// Factory producing the session's event stream given a cancellation token.
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
/// plugs into.
///
///
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
/// queue capacity shape used today.
///
///
/// Maximum number of events retained for replay. The oldest retained event is
/// evicted once this count is exceeded. 0 disables retention entirely.
///
///
/// Maximum age, in seconds, of a retained event. Entries older than this are
/// evicted regardless of capacity. 0 (or less) disables age-based eviction.
///
/// Logger for pump lifecycle diagnostics.
///
/// Clock used to timestamp and age-evict replay entries. Inject a fake to make
/// age-eviction deterministic in tests.
///
///
/// Optional per-subscriber backpressure handler invoked when a subscriber's bounded
/// channel is full. It records the overflow metric and, for the legacy
/// single-subscriber FailFast case, faults the owning session. The distributor always
/// disconnects the offending subscriber with an overflow fault regardless of the
/// handler. When (unit/skeleton use) the offending subscriber is
/// still disconnected but no metric/fault side effect runs.
///
///
/// when the owning session is in single-subscriber mode
/// (AllowMultipleEventSubscribers == false). This gates the FailFast
/// session-fault decision in OnSubscriberOverflow: an external subscriber that
/// overflows reports isOnlySubscriber == true (legacy FailFast faults the
/// session) ONLY in single-subscriber mode. In multi-subscriber mode it is always
/// , so FailFast degrades to a per-subscriber disconnect and a
/// transient registration race can never falsely fault a shared session (Task 8;
/// resolves the Task 5 REVISIT race). Defaults to so existing
/// call sites and unit tests keep legacy single-subscriber FailFast behavior.
///
public SessionEventDistributor(
string sessionId,
Func> eventSourceFactory,
int subscriberQueueCapacity,
int replayBufferCapacity,
double replayRetentionSeconds,
ILogger logger,
TimeProvider timeProvider,
SubscriberOverflowHandler? overflowHandler = null,
bool singleSubscriberMode = true)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
ArgumentOutOfRangeException.ThrowIfNegative(replayBufferCapacity);
ArgumentOutOfRangeException.ThrowIfNegative(replayRetentionSeconds);
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(timeProvider);
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
_singleSubscriberMode = singleSubscriberMode;
_overflowHandler = overflowHandler;
_shutdownTimeout = DefaultShutdownTimeout;
_replayBufferCapacity = replayBufferCapacity;
_ageEvictionEnabled = replayRetentionSeconds > 0;
_replayRetention = _ageEvictionEnabled
? TimeSpan.FromSeconds(replayRetentionSeconds)
: TimeSpan.Zero;
_logger = logger;
_timeProvider = timeProvider;
}
///
/// Gets the count of currently-registered subscribers. This count INCLUDES internal
/// subscribers (e.g. the gateway-owned dashboard mirror registered via
/// Register(isInternal: true)), and therefore differs from
/// , which tracks only external
/// (gRPC) subscribers and excludes the internal dashboard subscriber.
///
public int SubscriberCount => _subscribers.Count;
///
/// Starts the background pump. Idempotent — a second call is a no-op.
///
/// Token observed only while starting.
public Task StartAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_started)
{
return Task.CompletedTask;
}
_started = true;
_pumpTask = Task.Run(() => PumpAsync(_shutdownCts.Token), CancellationToken.None);
}
return Task.CompletedTask;
}
///
/// Registers a new subscriber and returns its lease. The lease exposes the
/// subscriber's and, when disposed, unregisters the
/// subscriber and completes its channel without disturbing the pump or other
/// subscribers.
///
///
/// for a gateway-owned internal subscriber (Task 6: the
/// session's dashboard mirror) that must NOT participate in the single-subscriber
/// overflow accounting. An internal subscriber is excluded from the
/// isOnlySubscriber count, so a lone external gRPC subscriber still reports
/// isOnlySubscriber == true (preserving legacy FailFast session-fault
/// behavior) even while the dashboard subscriber is attached; and an internal
/// subscriber that itself overflows always reports isOnlySubscriber == false,
/// so a slow/broken dashboard can never fault the session — it is merely
/// disconnected from the mirror. Defaults to (external
/// subscriber) so every existing call site is unchanged.
///
public IEventSubscriberLease Register(bool isInternal = false)
{
// The pump is the single writer for this channel; readers are single-consumer
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
//
// The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one
// slow reader can never stall the single pump that feeds every subscriber. FullMode
// is deliberately Wait — NOT because the pump ever blocks (it never calls the blocking
// WriteAsync overload), but because Wait is the only BoundedChannelFullMode under
// which TryWrite returns false when the channel is full. That false return IS the
// overflow signal the pump needs to apply the per-subscriber backpressure policy. The
// Drop* modes would make TryWrite silently succeed-and-drop, hiding overflow and
// re-introducing the silent data loss this task removes. So: Wait mode + TryWrite =
// a non-blocking pump that still detects a full subscriber channel.
Channel channel = Channel.CreateBounded(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel, isInternal);
// The disposed check AND the map add happen under the same lock with no await
// in between. DisposeAsync sets _disposed=true under this same lock before it
// calls CompleteAllSubscribers, so once disposal has begun no further subscriber
// can be added — closing the Register-after-DisposeAsync window that would
// otherwise leave a subscriber's channel never completed.
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
_subscribers[id] = subscriber;
}
return new SubscriberLease(this, subscriber);
}
///
/// Stops the pump and completes all subscriber channels. Idempotent.
///
public async ValueTask DisposeAsync()
{
Task? pumpTask;
lock (_lifecycleLock)
{
if (_disposed)
{
return;
}
_disposed = true;
pumpTask = _pumpTask;
}
// Signal the pump to stop. It must not block on a non-reading subscriber:
// it writes with non-blocking TryWrite, so cancellation tears it down promptly.
await _shutdownCts.CancelAsync().ConfigureAwait(false);
if (pumpTask is not null)
{
// Bound the wait: a source factory that ignores cancellation would otherwise
// hang dispose forever. If the pump does not stop in time we log and proceed
// to complete subscribers anyway; DisposeAsync must not throw on this path.
Task completed = await Task.WhenAny(pumpTask, Task.Delay(_shutdownTimeout)).ConfigureAwait(false);
if (!ReferenceEquals(completed, pumpTask))
{
_logger.LogWarning(
"Event distributor pump did not stop within {ShutdownTimeoutSeconds}s for session {SessionId}; completing subscribers and abandoning the pump.",
_shutdownTimeout.TotalSeconds,
_sessionId);
}
else
{
try
{
await pumpTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"Event distributor pump faulted during shutdown for session {SessionId}.",
_sessionId);
}
}
}
CompleteAllSubscribers(error: null);
_shutdownCts.Dispose();
}
private async Task PumpAsync(CancellationToken cancellationToken)
{
try
{
await foreach (MxEvent mxEvent in _eventSourceFactory(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
// Retain for replay BEFORE fan-out so a reconnecting subscriber that
// queries between fan-out and its own read still sees this event. Order
// is preserved: the pump is the single appender and events arrive in
// source order.
AppendToReplayBuffer(mxEvent);
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
// add/remove; a subscriber registered mid-iteration may miss this event,
// which matches "late subscribers see events after they register".
foreach (Subscriber subscriber in _subscribers.Values)
{
// Non-blocking write: TryWrite never blocks the pump on a slow reader.
// A false return means this subscriber's bounded channel is full — the
// per-subscriber overflow signal. We apply the backpressure policy to
// THIS subscriber only; the pump, the session, and every other subscriber
// keep running. Logs identifiers (worker sequence, subscriber id, session)
// only, never the event payload or tag values.
if (!subscriber.Channel.Writer.TryWrite(mxEvent))
{
OnSubscriberOverflow(subscriber, mxEvent.WorkerSequence);
}
}
}
CompleteAllSubscribers(error: null);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Shutdown path: DisposeAsync completes subscribers.
}
catch (Exception exception)
{
// Unexpected source fault (not the shutdown-cancellation path above) — visible
// by default so an event stream silently dying is not lost in Debug noise.
_logger.LogError(
exception,
"Event distributor source faulted for session {SessionId}.",
_sessionId);
CompleteAllSubscribers(exception);
}
}
// Applies the per-subscriber backpressure policy when a subscriber's bounded channel is
// full. Runs on the pump thread. The offending subscriber is ALWAYS disconnected with an
// overflow fault and unregistered, so it can never wedge the pump again; the overflow
// handler decides the observable side effects (overflow metric, and — for legacy
// single-subscriber FailFast — faulting the owning session). Multi-subscriber FailFast
// intentionally degrades to a plain disconnect (see SubscriberOverflowHandler docs): one
// slow consumer must not fault a session shared by other healthy subscribers.
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
{
// Decide whether FailFast may fault the whole session for this overflow. This is the
// "isOnlySubscriber" signal the legacy single-subscriber FailFast path keys on.
//
// Task 8 resolution of the Task 5/7 REVISIT race: gate this on the SESSION MODE
// (_singleSubscriberMode), NOT on a live count snapshot. The old
// `CountExternalSubscribers() == 1` snapshot raced once multi-subscriber became real —
// a concurrent second registration/unregistration could make the count read as 1 with
// two subscribers actually present, producing a false FailFast that faults a shared
// session. The mode is fixed for the session's lifetime, so reading it is race-free:
// - single-subscriber mode: at most one external subscriber can ever exist (the
// AttachEventSubscriber guard enforces it), so an overflowing external subscriber
// IS the sole subscriber — preserve the legacy FailFast session-fault behavior.
// - multi-subscriber mode: never fault the shared session; FailFast degrades to a
// per-subscriber disconnect so one slow consumer cannot punish healthy ones.
//
// Task 6: the gateway-owned internal dashboard subscriber is excluded — an internal
// subscriber that overflows is NEVER the "only subscriber", so a slow/broken dashboard
// can only disconnect its own mirror and never fault the session.
bool isOnlySubscriber = !subscriber.IsInternal && _singleSubscriberMode;
_logger.LogDebug(
"Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).",
subscriber.Id,
_sessionId,
workerSequence);
// Observability + session-fault decision. Errors here must not stall the pump or
// leave the subscriber attached, so the disconnect below runs regardless.
// Pass subscriber.IsInternal so the handler can choose the correct metric label.
try
{
_overflowHandler?.Invoke(isOnlySubscriber, subscriber.IsInternal);
}
catch (Exception exception)
{
_logger.LogError(
exception,
"Event distributor overflow handler threw for session {SessionId}; disconnecting subscriber {SubscriberId} anyway.",
_sessionId,
subscriber.Id);
}
// Disconnect ONLY this subscriber: complete its channel with the overflow fault and
// remove it from the fan-out set. Its gRPC reader's MoveNextAsync then throws the
// SessionManagerException, which EventStreamService surfaces to the client exactly as
// the pre-epic per-RPC overflow did. The pump and every other subscriber are untouched.
if (_subscribers.TryRemove(subscriber.Id, out _))
{
subscriber.Channel.Writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
$"Session {_sessionId} event stream queue overflowed."));
}
}
private void CompleteAllSubscribers(Exception? error)
{
foreach (Subscriber subscriber in _subscribers.Values)
{
subscriber.Channel.Writer.TryComplete(error);
}
}
private void Unregister(Subscriber subscriber)
{
if (_subscribers.TryRemove(subscriber.Id, out _))
{
subscriber.Channel.Writer.TryComplete();
}
}
///
/// Returns the retained events with strictly
/// greater than , in ascending sequence order, so a
/// reconnecting or reattaching subscriber can replay what it missed.
///
///
/// The last worker sequence the caller already observed. Only events newer than this
/// are returned.
///
///
/// The retained events newer than , in order. Never
/// null; empty when nothing newer is retained.
///
///
/// when events between and the
/// oldest retained event were already evicted (by capacity or age), meaning the caller
/// missed events that can no longer be replayed and must re-snapshot. When
/// , whatever IS still retained is still returned via
/// .
///
///
/// Always — the out parameters fully describe the result. The
/// return value exists for a fluent call shape and future extension.
///
///
/// Gap semantics, by buffer state:
///
/// -
/// Buffer non-empty: is iff
/// is below the oldest retained sequence minus
/// one (i.e. at least one event newer than but
/// older than the oldest retained was evicted). When
/// equals or exceeds the newest retained
/// sequence the caller is fully caught up: empty list, no gap.
///
/// -
/// Buffer empty (retention disabled, nothing seen yet, or everything evicted):
/// empty list, and is iff
/// is below the highest sequence ever seen —
/// i.e. the caller is behind but nothing is retained to replay. If no event has
/// ever been seen, or the caller is already at/ahead of the highest seen, there
/// is nothing to miss: no gap.
///
///
///
public bool TryGetReplayFrom(ulong afterSequence, out IReadOnlyList events, out bool gap)
{
lock (_replayLock)
{
EvictAged();
if (_replayBuffer.Count == 0)
{
events = [];
// Nothing retained. The caller missed events only if it is behind the
// highest sequence ever seen (and we have seen at least one event).
gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
return true;
}
ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
// A gap exists when at least one event newer than afterSequence was evicted,
// i.e. afterSequence sits below the oldest-retained-minus-one boundary.
// Written as (oldestRetained > 0 && afterSequence < oldestRetained - 1) to
// avoid wrapping when afterSequence == ulong.MaxValue (afterSequence + 1
// would overflow to 0, falsely reporting a gap).
gap = oldestRetained > 0 && afterSequence < oldestRetained - 1;
// O(n) scan over the retained buffer — acceptable because TryGetReplayFrom
// is only called on subscriber reconnect, never on the hot fan-out path.
List newer = [];
foreach (ReplayEntry entry in _replayBuffer)
{
if (entry.Event.WorkerSequence > afterSequence)
{
newer.Add(entry.Event);
}
}
events = newer;
return true;
}
}
private void AppendToReplayBuffer(MxEvent mxEvent)
{
lock (_replayLock)
{
_anyEventSeen = true;
if (mxEvent.WorkerSequence > _highestSequenceSeen)
{
_highestSequenceSeen = mxEvent.WorkerSequence;
}
// Capacity 0 disables retention: track the highest-seen sequence (so replay
// can still report a gap) but keep no events.
if (_replayBufferCapacity == 0)
{
return;
}
_replayBuffer.AddLast(new ReplayEntry(mxEvent, _timeProvider.GetUtcNow()));
// Capacity eviction: drop oldest until within bound.
while (_replayBuffer.Count > _replayBufferCapacity)
{
_replayBuffer.RemoveFirst();
}
EvictAged();
}
}
// Must be called under _replayLock. Drops entries older than the retention window.
private void EvictAged()
{
if (!_ageEvictionEnabled || _replayBuffer.Count == 0)
{
return;
}
DateTimeOffset cutoff = _timeProvider.GetUtcNow() - _replayRetention;
while (_replayBuffer.First is { } first && first.Value.RetainedAt < cutoff)
{
_replayBuffer.RemoveFirst();
}
}
private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt);
private sealed class Subscriber(long id, Channel channel, bool isInternal)
{
public long Id { get; } = id;
public Channel Channel { get; } = channel;
// True for the gateway-owned internal dashboard subscriber. Excluded from the
// single-subscriber overflow accounting so it cannot fault the session.
public bool IsInternal { get; } = isInternal;
}
private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
: IEventSubscriberLease
{
private int _leaseDisposed;
public ChannelReader Reader => subscriber.Channel.Reader;
public void Dispose()
{
// Atomic check-and-set so concurrent Dispose calls unregister at most once.
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
{
distributor.Unregister(subscriber);
}
}
}
}