feat(sessions): add SessionEventDistributor (pump + per-subscriber fan-out skeleton)

This commit is contained in:
Joseph Doherty
2026-06-15 12:32:13 -04:00
parent a43b2ee6af
commit c79b292968
3 changed files with 417 additions and 0 deletions
@@ -0,0 +1,18 @@
using System.Threading.Channels;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// <summary>
/// A registration lease into a <see cref="SessionEventDistributor"/>. Exposes the
/// subscriber's own <see cref="ChannelReader{T}"/> of fanned events. Disposing the
/// lease unregisters the subscriber and completes its channel without disturbing the
/// pump or other subscribers.
/// </summary>
public interface IEventSubscriberLease : IDisposable
{
/// <summary>
/// Gets the reader for this subscriber's fanned event channel.
/// </summary>
ChannelReader<MxEvent> Reader { get; }
}
@@ -0,0 +1,273 @@
using System.Collections.Concurrent;
using System.Threading.Channels;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// <summary>
/// Per-session event pump and fan-out. A single background task drains the
/// session's event source <em>exactly once</em> and fans each event out to
/// every currently-registered subscriber's own bounded channel.
/// </summary>
/// <remarks>
/// <para>
/// This is the skeleton introduced by Task 2 of the Session Resilience epic.
/// It is a standalone class — it is NOT yet wired into <c>GatewaySession</c> or
/// <c>EventStreamService</c> (Task 4), it has no replay ring buffer (Task 3),
/// no per-subscriber backpressure-isolation policy (Task 5), and it does not
/// remove the single-subscriber guard (Tasks 7/8).
/// </para>
/// <para>
/// <b>Source seam.</b> The event source is injected as a
/// <see cref="Func{T, TResult}"/> producing an
/// <see cref="IAsyncEnumerable{T}"/> of already-mapped public
/// <see cref="MxEvent"/>s, given a <see cref="CancellationToken"/>. This is the
/// cleanest seam for Task 4: it can pass
/// <c>ct =&gt; session.ReadEventsAsync(ct).Select(mapper.MapEvent)</c> (or a
/// channel reader's <c>ReadAllAsync</c>), while unit tests pass a plain
/// channel reader's <c>ReadAllAsync</c> with no real session. The pump owns the
/// single consumption of this enumerable; fan-out happens on the public
/// <see cref="MxEvent"/> after mapping, mirroring today's
/// <c>EventStreamService.ProduceEventsAsync</c> ordering.
/// </para>
/// <para>
/// <b>Concurrency.</b> The subscriber set is a
/// <see cref="ConcurrentDictionary{TKey, TValue}"/> keyed by a monotonic id.
/// The pump iterates it with a snapshot-free enumerator (which never throws on
/// concurrent add/remove), and <see cref="Register"/> / lease disposal mutate it
/// without any lock held across an <c>await</c>. Each subscriber channel has a
/// single writer — the pump — so per-channel writes never race. MXAccess parity:
/// events are fanned in the order received; the pump never reorders or
/// synthesizes events.
/// </para>
/// </remarks>
public sealed class SessionEventDistributor : IAsyncDisposable
{
private readonly string _sessionId;
private readonly Func<CancellationToken, IAsyncEnumerable<MxEvent>> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
private readonly ILogger<SessionEventDistributor> _logger;
private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new();
private readonly CancellationTokenSource _shutdownCts = new();
private readonly object _lifecycleLock = new();
private long _nextSubscriberId;
private Task? _pumpTask;
private bool _started;
private bool _disposed;
/// <summary>
/// Initializes a per-session event distributor.
/// </summary>
/// <param name="sessionId">Owning session id, used only for logging context.</param>
/// <param name="eventSourceFactory">
/// Factory producing the session's event stream given a cancellation token.
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
/// plugs into.
/// </param>
/// <param name="subscriberQueueCapacity">
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
/// queue capacity shape used today.
/// </param>
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
public SessionEventDistributor(
string sessionId,
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
int subscriberQueueCapacity,
ILogger<SessionEventDistributor> logger)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
ArgumentNullException.ThrowIfNull(logger);
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
_logger = logger;
}
/// <summary>
/// Gets the count of currently-registered subscribers.
/// </summary>
public int SubscriberCount => _subscribers.Count;
/// <summary>
/// Starts the background pump. Idempotent — a second call is a no-op.
/// </summary>
/// <param name="cancellationToken">Token observed only while starting.</param>
public Task StartAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_started)
{
return Task.CompletedTask;
}
_started = true;
_pumpTask = Task.Run(() => PumpAsync(_shutdownCts.Token), CancellationToken.None);
}
return Task.CompletedTask;
}
/// <summary>
/// Registers a new subscriber and returns its lease. The lease exposes the
/// subscriber's <see cref="ChannelReader{T}"/> and, when disposed, unregisters the
/// subscriber and completes its channel without disturbing the pump or other
/// subscribers.
/// </summary>
public IEventSubscriberLease Register()
{
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
// The pump is the single writer for this channel; readers are single-consumer
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
Channel<MxEvent> channel = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel);
_subscribers[id] = subscriber;
// Disposal between the add and a concurrent DisposeAsync: DisposeAsync drains
// and completes every subscriber currently in the map, so this entry is covered.
return new SubscriberLease(this, subscriber);
}
/// <summary>
/// Stops the pump and completes all subscriber channels. Idempotent.
/// </summary>
public async ValueTask DisposeAsync()
{
Task? pumpTask;
lock (_lifecycleLock)
{
if (_disposed)
{
return;
}
_disposed = true;
pumpTask = _pumpTask;
}
// Signal the pump to stop. It must not block on a non-reading subscriber:
// it writes with non-blocking TryWrite, so cancellation tears it down promptly.
await _shutdownCts.CancelAsync().ConfigureAwait(false);
if (pumpTask is not null)
{
try
{
await pumpTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"Event distributor pump faulted during shutdown for session {SessionId}.",
_sessionId);
}
}
CompleteAllSubscribers(error: null);
_shutdownCts.Dispose();
}
private async Task PumpAsync(CancellationToken cancellationToken)
{
try
{
await foreach (MxEvent mxEvent in _eventSourceFactory(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
// add/remove; a subscriber registered mid-iteration may miss this event,
// which matches "late subscribers see events after they register".
foreach (Subscriber subscriber in _subscribers.Values)
{
// TODO(Task 5): define overflow policy (per-subscriber isolation —
// drop / disconnect / fault that one subscriber). For the Task 2
// skeleton, a non-blocking TryWrite that silently drops on a full
// channel is the placeholder so one slow reader never stalls the pump.
subscriber.Channel.Writer.TryWrite(mxEvent);
}
}
CompleteAllSubscribers(error: null);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Shutdown path: DisposeAsync completes subscribers.
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"Event distributor source faulted for session {SessionId}.",
_sessionId);
CompleteAllSubscribers(exception);
}
}
private void CompleteAllSubscribers(Exception? error)
{
foreach (Subscriber subscriber in _subscribers.Values)
{
subscriber.Channel.Writer.TryComplete(error);
}
}
private void Unregister(Subscriber subscriber)
{
if (_subscribers.TryRemove(subscriber.Id, out _))
{
subscriber.Channel.Writer.TryComplete();
}
}
private sealed class Subscriber(long id, Channel<MxEvent> channel)
{
public long Id { get; } = id;
public Channel<MxEvent> Channel { get; } = channel;
}
private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
: IEventSubscriberLease
{
private bool _disposed;
public ChannelReader<MxEvent> Reader => subscriber.Channel.Reader;
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
distributor.Unregister(subscriber);
}
}
}
@@ -0,0 +1,126 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Sessions;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Concurrency and fan-out tests for <see cref="SessionEventDistributor"/>, the
/// Session Resilience epic's per-session event pump. One pump drains the source
/// exactly once and fans every event to N independent per-subscriber channels.
/// Every async wait is bounded so a fan-out or shutdown deadlock fails fast.
/// </summary>
public sealed class SessionEventDistributorTests
{
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task TwoSubscribers_BothReceiveFannedEventsInOrder()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease leaseA = distributor.Register();
using IEventSubscriberLease leaseB = distributor.Register();
source.Writer.TryWrite(Event(1));
source.Writer.TryWrite(Event(2));
MxEvent a1 = await ReadOneAsync(leaseA.Reader);
MxEvent a2 = await ReadOneAsync(leaseA.Reader);
MxEvent b1 = await ReadOneAsync(leaseB.Reader);
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
Assert.Equal(1ul, a1.WorkerSequence);
Assert.Equal(2ul, a2.WorkerSequence);
Assert.Equal(1ul, b1.WorkerSequence);
Assert.Equal(2ul, b2.WorkerSequence);
}
[Fact]
public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
IEventSubscriberLease leaseA = distributor.Register();
using IEventSubscriberLease leaseB = distributor.Register();
source.Writer.TryWrite(Event(1));
_ = await ReadOneAsync(leaseA.Reader);
_ = await ReadOneAsync(leaseB.Reader);
leaseA.Dispose();
// A's reader must complete (no more delivery) after dispose.
await AssertCompletedAsync(leaseA.Reader);
// B still receives subsequent events.
source.Writer.TryWrite(Event(2));
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
Assert.Equal(2ul, b2.WorkerSequence);
}
[Fact]
public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease leaseA = distributor.Register();
source.Writer.TryWrite(Event(1));
_ = await ReadOneAsync(leaseA.Reader);
// Late subscriber: only sees events emitted after it registered.
using IEventSubscriberLease leaseB = distributor.Register();
source.Writer.TryWrite(Event(2));
MxEvent b = await ReadOneAsync(leaseB.Reader);
Assert.Equal(2ul, b.WorkerSequence);
}
[Fact]
public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease leaseA = distributor.Register();
using IEventSubscriberLease leaseB = distributor.Register();
// Bounded so a shutdown hang fails fast.
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
await AssertCompletedAsync(leaseA.Reader);
await AssertCompletedAsync(leaseB.Reader);
}
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
=> new(
"session-test",
ct => source.ReadAllAsync(ct),
subscriberQueueCapacity: 64,
NullLogger<SessionEventDistributor>.Instance);
private static MxEvent Event(ulong sequence)
=> new() { SessionId = "session-test", WorkerSequence = sequence };
private static async Task<MxEvent> ReadOneAsync(ChannelReader<MxEvent> reader)
{
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
Assert.True(reader.TryRead(out MxEvent? value));
return value!;
}
private static async Task AssertCompletedAsync(ChannelReader<MxEvent> reader)
{
// Drain anything still buffered, then assert the channel is completed
// (no further events). Bounded so a never-completing channel fails fast.
await reader.Completion.WaitAsync(ReadTimeout);
}
}