From c79b2929681248ebbaf6fc4cb9bd0a4f66c9f1ed Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 12:32:13 -0400 Subject: [PATCH] feat(sessions): add SessionEventDistributor (pump + per-subscriber fan-out skeleton) --- .../Sessions/IEventSubscriberLease.cs | 18 ++ .../Sessions/SessionEventDistributor.cs | 273 ++++++++++++++++++ .../Sessions/SessionEventDistributorTests.cs | 126 ++++++++ 3 files changed, 417 insertions(+) create mode 100644 src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs create mode 100644 src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs create mode 100644 src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs new file mode 100644 index 0000000..fb964e7 --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs @@ -0,0 +1,18 @@ +using System.Threading.Channels; +using ZB.MOM.WW.MxGateway.Contracts.Proto; + +namespace ZB.MOM.WW.MxGateway.Server.Sessions; + +/// +/// A registration lease into a . Exposes the +/// subscriber's own of fanned events. Disposing the +/// lease unregisters the subscriber and completes its channel without disturbing the +/// pump or other subscribers. +/// +public interface IEventSubscriberLease : IDisposable +{ + /// + /// Gets the reader for this subscriber's fanned event channel. + /// + ChannelReader Reader { get; } +} diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs new file mode 100644 index 0000000..f4f1d60 --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -0,0 +1,273 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using ZB.MOM.WW.MxGateway.Contracts.Proto; + +namespace ZB.MOM.WW.MxGateway.Server.Sessions; + +/// +/// Per-session event pump and fan-out. A single background task drains the +/// session's event source exactly once and fans each event out to +/// every currently-registered subscriber's own bounded channel. +/// +/// +/// +/// This is the skeleton introduced by Task 2 of the Session Resilience epic. +/// It is a standalone class — it is NOT yet wired into GatewaySession or +/// EventStreamService (Task 4), it has no replay ring buffer (Task 3), +/// no per-subscriber backpressure-isolation policy (Task 5), and it does not +/// remove the single-subscriber guard (Tasks 7/8). +/// +/// +/// Source seam. The event source is injected as a +/// producing an +/// of already-mapped public +/// s, given a . This is the +/// cleanest seam for Task 4: it can pass +/// ct => session.ReadEventsAsync(ct).Select(mapper.MapEvent) (or a +/// channel reader's ReadAllAsync), while unit tests pass a plain +/// channel reader's ReadAllAsync with no real session. The pump owns the +/// single consumption of this enumerable; fan-out happens on the public +/// after mapping, mirroring today's +/// EventStreamService.ProduceEventsAsync ordering. +/// +/// +/// Concurrency. The subscriber set is a +/// keyed by a monotonic id. +/// The pump iterates it with a snapshot-free enumerator (which never throws on +/// concurrent add/remove), and / lease disposal mutate it +/// without any lock held across an await. Each subscriber channel has a +/// single writer — the pump — so per-channel writes never race. MXAccess parity: +/// events are fanned in the order received; the pump never reorders or +/// synthesizes events. +/// +/// +public sealed class SessionEventDistributor : IAsyncDisposable +{ + private readonly string _sessionId; + private readonly Func> _eventSourceFactory; + private readonly int _subscriberQueueCapacity; + private readonly ILogger _logger; + private readonly ConcurrentDictionary _subscribers = new(); + private readonly CancellationTokenSource _shutdownCts = new(); + private readonly object _lifecycleLock = new(); + + private long _nextSubscriberId; + private Task? _pumpTask; + private bool _started; + private bool _disposed; + + /// + /// Initializes a per-session event distributor. + /// + /// Owning session id, used only for logging context. + /// + /// Factory producing the session's event stream given a cancellation token. + /// The pump consumes this exactly once. See the type remarks for the seam Task 4 + /// plugs into. + /// + /// + /// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream + /// queue capacity shape used today. + /// + /// Logger for pump lifecycle diagnostics. + public SessionEventDistributor( + string sessionId, + Func> eventSourceFactory, + int subscriberQueueCapacity, + ILogger logger) + { + ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); + ArgumentNullException.ThrowIfNull(eventSourceFactory); + ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1); + ArgumentNullException.ThrowIfNull(logger); + + _sessionId = sessionId; + _eventSourceFactory = eventSourceFactory; + _subscriberQueueCapacity = subscriberQueueCapacity; + _logger = logger; + } + + /// + /// Gets the count of currently-registered subscribers. + /// + public int SubscriberCount => _subscribers.Count; + + /// + /// Starts the background pump. Idempotent — a second call is a no-op. + /// + /// Token observed only while starting. + public Task StartAsync(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + lock (_lifecycleLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (_started) + { + return Task.CompletedTask; + } + + _started = true; + _pumpTask = Task.Run(() => PumpAsync(_shutdownCts.Token), CancellationToken.None); + } + + return Task.CompletedTask; + } + + /// + /// Registers a new subscriber and returns its lease. The lease exposes the + /// subscriber's and, when disposed, unregisters the + /// subscriber and completes its channel without disturbing the pump or other + /// subscribers. + /// + public IEventSubscriberLease Register() + { + lock (_lifecycleLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + } + + // The pump is the single writer for this channel; readers are single-consumer + // (one gRPC stream / dashboard subscriber). Synchronous continuations are + // disabled so a slow reader can never stall the pump on its completion. + Channel channel = Channel.CreateBounded( + new BoundedChannelOptions(_subscriberQueueCapacity) + { + SingleReader = true, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait, + AllowSynchronousContinuations = false, + }); + + long id = Interlocked.Increment(ref _nextSubscriberId); + Subscriber subscriber = new(id, channel); + _subscribers[id] = subscriber; + + // Disposal between the add and a concurrent DisposeAsync: DisposeAsync drains + // and completes every subscriber currently in the map, so this entry is covered. + return new SubscriberLease(this, subscriber); + } + + /// + /// Stops the pump and completes all subscriber channels. Idempotent. + /// + public async ValueTask DisposeAsync() + { + Task? pumpTask; + lock (_lifecycleLock) + { + if (_disposed) + { + return; + } + + _disposed = true; + pumpTask = _pumpTask; + } + + // Signal the pump to stop. It must not block on a non-reading subscriber: + // it writes with non-blocking TryWrite, so cancellation tears it down promptly. + await _shutdownCts.CancelAsync().ConfigureAwait(false); + + if (pumpTask is not null) + { + try + { + await pumpTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.LogDebug( + exception, + "Event distributor pump faulted during shutdown for session {SessionId}.", + _sessionId); + } + } + + CompleteAllSubscribers(error: null); + _shutdownCts.Dispose(); + } + + private async Task PumpAsync(CancellationToken cancellationToken) + { + try + { + await foreach (MxEvent mxEvent in _eventSourceFactory(cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + // Enumerating a ConcurrentDictionary's Values never throws on concurrent + // add/remove; a subscriber registered mid-iteration may miss this event, + // which matches "late subscribers see events after they register". + foreach (Subscriber subscriber in _subscribers.Values) + { + // TODO(Task 5): define overflow policy (per-subscriber isolation — + // drop / disconnect / fault that one subscriber). For the Task 2 + // skeleton, a non-blocking TryWrite that silently drops on a full + // channel is the placeholder so one slow reader never stalls the pump. + subscriber.Channel.Writer.TryWrite(mxEvent); + } + } + + CompleteAllSubscribers(error: null); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Shutdown path: DisposeAsync completes subscribers. + } + catch (Exception exception) + { + _logger.LogDebug( + exception, + "Event distributor source faulted for session {SessionId}.", + _sessionId); + CompleteAllSubscribers(exception); + } + } + + private void CompleteAllSubscribers(Exception? error) + { + foreach (Subscriber subscriber in _subscribers.Values) + { + subscriber.Channel.Writer.TryComplete(error); + } + } + + private void Unregister(Subscriber subscriber) + { + if (_subscribers.TryRemove(subscriber.Id, out _)) + { + subscriber.Channel.Writer.TryComplete(); + } + } + + private sealed class Subscriber(long id, Channel channel) + { + public long Id { get; } = id; + + public Channel Channel { get; } = channel; + } + + private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber) + : IEventSubscriberLease + { + private bool _disposed; + + public ChannelReader Reader => subscriber.Channel.Reader; + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + distributor.Unregister(subscriber); + } + } +} diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs new file mode 100644 index 0000000..05d596a --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -0,0 +1,126 @@ +using System.Threading.Channels; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Server.Sessions; + +namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; + +/// +/// Concurrency and fan-out tests for , the +/// Session Resilience epic's per-session event pump. One pump drains the source +/// exactly once and fans every event to N independent per-subscriber channels. +/// Every async wait is bounded so a fan-out or shutdown deadlock fails fast. +/// +public sealed class SessionEventDistributorTests +{ + private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5); + + [Fact] + public async Task TwoSubscribers_BothReceiveFannedEventsInOrder() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor(source.Reader); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease leaseA = distributor.Register(); + using IEventSubscriberLease leaseB = distributor.Register(); + + source.Writer.TryWrite(Event(1)); + source.Writer.TryWrite(Event(2)); + + MxEvent a1 = await ReadOneAsync(leaseA.Reader); + MxEvent a2 = await ReadOneAsync(leaseA.Reader); + MxEvent b1 = await ReadOneAsync(leaseB.Reader); + MxEvent b2 = await ReadOneAsync(leaseB.Reader); + + Assert.Equal(1ul, a1.WorkerSequence); + Assert.Equal(2ul, a2.WorkerSequence); + Assert.Equal(1ul, b1.WorkerSequence); + Assert.Equal(2ul, b2.WorkerSequence); + } + + [Fact] + public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor(source.Reader); + await distributor.StartAsync(CancellationToken.None); + + IEventSubscriberLease leaseA = distributor.Register(); + using IEventSubscriberLease leaseB = distributor.Register(); + + source.Writer.TryWrite(Event(1)); + _ = await ReadOneAsync(leaseA.Reader); + _ = await ReadOneAsync(leaseB.Reader); + + leaseA.Dispose(); + + // A's reader must complete (no more delivery) after dispose. + await AssertCompletedAsync(leaseA.Reader); + + // B still receives subsequent events. + source.Writer.TryWrite(Event(2)); + MxEvent b2 = await ReadOneAsync(leaseB.Reader); + Assert.Equal(2ul, b2.WorkerSequence); + } + + [Fact] + public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor(source.Reader); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease leaseA = distributor.Register(); + source.Writer.TryWrite(Event(1)); + _ = await ReadOneAsync(leaseA.Reader); + + // Late subscriber: only sees events emitted after it registered. + using IEventSubscriberLease leaseB = distributor.Register(); + source.Writer.TryWrite(Event(2)); + + MxEvent b = await ReadOneAsync(leaseB.Reader); + Assert.Equal(2ul, b.WorkerSequence); + } + + [Fact] + public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump() + { + Channel source = Channel.CreateUnbounded(); + SessionEventDistributor distributor = CreateDistributor(source.Reader); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease leaseA = distributor.Register(); + using IEventSubscriberLease leaseB = distributor.Register(); + + // Bounded so a shutdown hang fails fast. + await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); + + await AssertCompletedAsync(leaseA.Reader); + await AssertCompletedAsync(leaseB.Reader); + } + + private static SessionEventDistributor CreateDistributor(ChannelReader source) + => new( + "session-test", + ct => source.ReadAllAsync(ct), + subscriberQueueCapacity: 64, + NullLogger.Instance); + + private static MxEvent Event(ulong sequence) + => new() { SessionId = "session-test", WorkerSequence = sequence }; + + private static async Task ReadOneAsync(ChannelReader reader) + { + await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout); + Assert.True(reader.TryRead(out MxEvent? value)); + return value!; + } + + private static async Task AssertCompletedAsync(ChannelReader reader) + { + // Drain anything still buffered, then assert the channel is completed + // (no further events). Bounded so a never-completing channel fails fast. + await reader.Completion.WaitAsync(ReadTimeout); + } +}