diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs
new file mode 100644
index 0000000..fb964e7
--- /dev/null
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/IEventSubscriberLease.cs
@@ -0,0 +1,18 @@
+using System.Threading.Channels;
+using ZB.MOM.WW.MxGateway.Contracts.Proto;
+
+namespace ZB.MOM.WW.MxGateway.Server.Sessions;
+
+///
+/// A registration lease into a . Exposes the
+/// subscriber's own of fanned events. Disposing the
+/// lease unregisters the subscriber and completes its channel without disturbing the
+/// pump or other subscribers.
+///
+public interface IEventSubscriberLease : IDisposable
+{
+ ///
+ /// Gets the reader for this subscriber's fanned event channel.
+ ///
+ ChannelReader Reader { get; }
+}
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
new file mode 100644
index 0000000..f4f1d60
--- /dev/null
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -0,0 +1,273 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using ZB.MOM.WW.MxGateway.Contracts.Proto;
+
+namespace ZB.MOM.WW.MxGateway.Server.Sessions;
+
+///
+/// Per-session event pump and fan-out. A single background task drains the
+/// session's event source exactly once and fans each event out to
+/// every currently-registered subscriber's own bounded channel.
+///
+///
+///
+/// This is the skeleton introduced by Task 2 of the Session Resilience epic.
+/// It is a standalone class — it is NOT yet wired into GatewaySession or
+/// EventStreamService (Task 4), it has no replay ring buffer (Task 3),
+/// no per-subscriber backpressure-isolation policy (Task 5), and it does not
+/// remove the single-subscriber guard (Tasks 7/8).
+///
+///
+/// Source seam. The event source is injected as a
+/// producing an
+/// of already-mapped public
+/// s, given a . This is the
+/// cleanest seam for Task 4: it can pass
+/// ct => session.ReadEventsAsync(ct).Select(mapper.MapEvent) (or a
+/// channel reader's ReadAllAsync), while unit tests pass a plain
+/// channel reader's ReadAllAsync with no real session. The pump owns the
+/// single consumption of this enumerable; fan-out happens on the public
+/// after mapping, mirroring today's
+/// EventStreamService.ProduceEventsAsync ordering.
+///
+///
+/// Concurrency. The subscriber set is a
+/// keyed by a monotonic id.
+/// The pump iterates it with a snapshot-free enumerator (which never throws on
+/// concurrent add/remove), and / lease disposal mutate it
+/// without any lock held across an await. Each subscriber channel has a
+/// single writer — the pump — so per-channel writes never race. MXAccess parity:
+/// events are fanned in the order received; the pump never reorders or
+/// synthesizes events.
+///
+///
+public sealed class SessionEventDistributor : IAsyncDisposable
+{
+ private readonly string _sessionId;
+ private readonly Func> _eventSourceFactory;
+ private readonly int _subscriberQueueCapacity;
+ private readonly ILogger _logger;
+ private readonly ConcurrentDictionary _subscribers = new();
+ private readonly CancellationTokenSource _shutdownCts = new();
+ private readonly object _lifecycleLock = new();
+
+ private long _nextSubscriberId;
+ private Task? _pumpTask;
+ private bool _started;
+ private bool _disposed;
+
+ ///
+ /// Initializes a per-session event distributor.
+ ///
+ /// Owning session id, used only for logging context.
+ ///
+ /// Factory producing the session's event stream given a cancellation token.
+ /// The pump consumes this exactly once. See the type remarks for the seam Task 4
+ /// plugs into.
+ ///
+ ///
+ /// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
+ /// queue capacity shape used today.
+ ///
+ /// Logger for pump lifecycle diagnostics.
+ public SessionEventDistributor(
+ string sessionId,
+ Func> eventSourceFactory,
+ int subscriberQueueCapacity,
+ ILogger logger)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
+ ArgumentNullException.ThrowIfNull(eventSourceFactory);
+ ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ _sessionId = sessionId;
+ _eventSourceFactory = eventSourceFactory;
+ _subscriberQueueCapacity = subscriberQueueCapacity;
+ _logger = logger;
+ }
+
+ ///
+ /// Gets the count of currently-registered subscribers.
+ ///
+ public int SubscriberCount => _subscribers.Count;
+
+ ///
+ /// Starts the background pump. Idempotent — a second call is a no-op.
+ ///
+ /// Token observed only while starting.
+ public Task StartAsync(CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ lock (_lifecycleLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ if (_started)
+ {
+ return Task.CompletedTask;
+ }
+
+ _started = true;
+ _pumpTask = Task.Run(() => PumpAsync(_shutdownCts.Token), CancellationToken.None);
+ }
+
+ return Task.CompletedTask;
+ }
+
+ ///
+ /// Registers a new subscriber and returns its lease. The lease exposes the
+ /// subscriber's and, when disposed, unregisters the
+ /// subscriber and completes its channel without disturbing the pump or other
+ /// subscribers.
+ ///
+ public IEventSubscriberLease Register()
+ {
+ lock (_lifecycleLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ }
+
+ // The pump is the single writer for this channel; readers are single-consumer
+ // (one gRPC stream / dashboard subscriber). Synchronous continuations are
+ // disabled so a slow reader can never stall the pump on its completion.
+ Channel channel = Channel.CreateBounded(
+ new BoundedChannelOptions(_subscriberQueueCapacity)
+ {
+ SingleReader = true,
+ SingleWriter = true,
+ FullMode = BoundedChannelFullMode.Wait,
+ AllowSynchronousContinuations = false,
+ });
+
+ long id = Interlocked.Increment(ref _nextSubscriberId);
+ Subscriber subscriber = new(id, channel);
+ _subscribers[id] = subscriber;
+
+ // Disposal between the add and a concurrent DisposeAsync: DisposeAsync drains
+ // and completes every subscriber currently in the map, so this entry is covered.
+ return new SubscriberLease(this, subscriber);
+ }
+
+ ///
+ /// Stops the pump and completes all subscriber channels. Idempotent.
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ Task? pumpTask;
+ lock (_lifecycleLock)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _disposed = true;
+ pumpTask = _pumpTask;
+ }
+
+ // Signal the pump to stop. It must not block on a non-reading subscriber:
+ // it writes with non-blocking TryWrite, so cancellation tears it down promptly.
+ await _shutdownCts.CancelAsync().ConfigureAwait(false);
+
+ if (pumpTask is not null)
+ {
+ try
+ {
+ await pumpTask.ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (Exception exception)
+ {
+ _logger.LogDebug(
+ exception,
+ "Event distributor pump faulted during shutdown for session {SessionId}.",
+ _sessionId);
+ }
+ }
+
+ CompleteAllSubscribers(error: null);
+ _shutdownCts.Dispose();
+ }
+
+ private async Task PumpAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ await foreach (MxEvent mxEvent in _eventSourceFactory(cancellationToken)
+ .WithCancellation(cancellationToken)
+ .ConfigureAwait(false))
+ {
+ // Enumerating a ConcurrentDictionary's Values never throws on concurrent
+ // add/remove; a subscriber registered mid-iteration may miss this event,
+ // which matches "late subscribers see events after they register".
+ foreach (Subscriber subscriber in _subscribers.Values)
+ {
+ // TODO(Task 5): define overflow policy (per-subscriber isolation —
+ // drop / disconnect / fault that one subscriber). For the Task 2
+ // skeleton, a non-blocking TryWrite that silently drops on a full
+ // channel is the placeholder so one slow reader never stalls the pump.
+ subscriber.Channel.Writer.TryWrite(mxEvent);
+ }
+ }
+
+ CompleteAllSubscribers(error: null);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ // Shutdown path: DisposeAsync completes subscribers.
+ }
+ catch (Exception exception)
+ {
+ _logger.LogDebug(
+ exception,
+ "Event distributor source faulted for session {SessionId}.",
+ _sessionId);
+ CompleteAllSubscribers(exception);
+ }
+ }
+
+ private void CompleteAllSubscribers(Exception? error)
+ {
+ foreach (Subscriber subscriber in _subscribers.Values)
+ {
+ subscriber.Channel.Writer.TryComplete(error);
+ }
+ }
+
+ private void Unregister(Subscriber subscriber)
+ {
+ if (_subscribers.TryRemove(subscriber.Id, out _))
+ {
+ subscriber.Channel.Writer.TryComplete();
+ }
+ }
+
+ private sealed class Subscriber(long id, Channel channel)
+ {
+ public long Id { get; } = id;
+
+ public Channel Channel { get; } = channel;
+ }
+
+ private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
+ : IEventSubscriberLease
+ {
+ private bool _disposed;
+
+ public ChannelReader Reader => subscriber.Channel.Reader;
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _disposed = true;
+ distributor.Unregister(subscriber);
+ }
+ }
+}
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
new file mode 100644
index 0000000..05d596a
--- /dev/null
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -0,0 +1,126 @@
+using System.Threading.Channels;
+using Microsoft.Extensions.Logging.Abstractions;
+using ZB.MOM.WW.MxGateway.Contracts.Proto;
+using ZB.MOM.WW.MxGateway.Server.Sessions;
+
+namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
+
+///
+/// Concurrency and fan-out tests for , the
+/// Session Resilience epic's per-session event pump. One pump drains the source
+/// exactly once and fans every event to N independent per-subscriber channels.
+/// Every async wait is bounded so a fan-out or shutdown deadlock fails fast.
+///
+public sealed class SessionEventDistributorTests
+{
+ private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
+
+ [Fact]
+ public async Task TwoSubscribers_BothReceiveFannedEventsInOrder()
+ {
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease leaseA = distributor.Register();
+ using IEventSubscriberLease leaseB = distributor.Register();
+
+ source.Writer.TryWrite(Event(1));
+ source.Writer.TryWrite(Event(2));
+
+ MxEvent a1 = await ReadOneAsync(leaseA.Reader);
+ MxEvent a2 = await ReadOneAsync(leaseA.Reader);
+ MxEvent b1 = await ReadOneAsync(leaseB.Reader);
+ MxEvent b2 = await ReadOneAsync(leaseB.Reader);
+
+ Assert.Equal(1ul, a1.WorkerSequence);
+ Assert.Equal(2ul, a2.WorkerSequence);
+ Assert.Equal(1ul, b1.WorkerSequence);
+ Assert.Equal(2ul, b2.WorkerSequence);
+ }
+
+ [Fact]
+ public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving()
+ {
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
+ await distributor.StartAsync(CancellationToken.None);
+
+ IEventSubscriberLease leaseA = distributor.Register();
+ using IEventSubscriberLease leaseB = distributor.Register();
+
+ source.Writer.TryWrite(Event(1));
+ _ = await ReadOneAsync(leaseA.Reader);
+ _ = await ReadOneAsync(leaseB.Reader);
+
+ leaseA.Dispose();
+
+ // A's reader must complete (no more delivery) after dispose.
+ await AssertCompletedAsync(leaseA.Reader);
+
+ // B still receives subsequent events.
+ source.Writer.TryWrite(Event(2));
+ MxEvent b2 = await ReadOneAsync(leaseB.Reader);
+ Assert.Equal(2ul, b2.WorkerSequence);
+ }
+
+ [Fact]
+ public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration()
+ {
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease leaseA = distributor.Register();
+ source.Writer.TryWrite(Event(1));
+ _ = await ReadOneAsync(leaseA.Reader);
+
+ // Late subscriber: only sees events emitted after it registered.
+ using IEventSubscriberLease leaseB = distributor.Register();
+ source.Writer.TryWrite(Event(2));
+
+ MxEvent b = await ReadOneAsync(leaseB.Reader);
+ Assert.Equal(2ul, b.WorkerSequence);
+ }
+
+ [Fact]
+ public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump()
+ {
+ Channel source = Channel.CreateUnbounded();
+ SessionEventDistributor distributor = CreateDistributor(source.Reader);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease leaseA = distributor.Register();
+ using IEventSubscriberLease leaseB = distributor.Register();
+
+ // Bounded so a shutdown hang fails fast.
+ await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
+
+ await AssertCompletedAsync(leaseA.Reader);
+ await AssertCompletedAsync(leaseB.Reader);
+ }
+
+ private static SessionEventDistributor CreateDistributor(ChannelReader source)
+ => new(
+ "session-test",
+ ct => source.ReadAllAsync(ct),
+ subscriberQueueCapacity: 64,
+ NullLogger.Instance);
+
+ private static MxEvent Event(ulong sequence)
+ => new() { SessionId = "session-test", WorkerSequence = sequence };
+
+ private static async Task ReadOneAsync(ChannelReader reader)
+ {
+ await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
+ Assert.True(reader.TryRead(out MxEvent? value));
+ return value!;
+ }
+
+ private static async Task AssertCompletedAsync(ChannelReader reader)
+ {
+ // Drain anything still buffered, then assert the channel is completed
+ // (no further events). Bounded so a never-completing channel fails fast.
+ await reader.Completion.WaitAsync(ReadTimeout);
+ }
+}