using System.Threading.Channels; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Time.Testing; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Sessions; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; /// /// Concurrency and fan-out tests for , the /// Session Resilience epic's per-session event pump. One pump drains the source /// exactly once and fans every event to N independent per-subscriber channels. /// Every async wait is bounded so a fan-out or shutdown deadlock fails fast. /// public sealed class SessionEventDistributorTests { private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5); [Fact] public async Task TwoSubscribers_BothReceiveFannedEventsInOrder() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease leaseA = distributor.Register(); using IEventSubscriberLease leaseB = distributor.Register(); source.Writer.TryWrite(Event(1)); source.Writer.TryWrite(Event(2)); MxEvent a1 = await ReadOneAsync(leaseA.Reader); MxEvent a2 = await ReadOneAsync(leaseA.Reader); MxEvent b1 = await ReadOneAsync(leaseB.Reader); MxEvent b2 = await ReadOneAsync(leaseB.Reader); Assert.Equal(1ul, a1.WorkerSequence); Assert.Equal(2ul, a2.WorkerSequence); Assert.Equal(1ul, b1.WorkerSequence); Assert.Equal(2ul, b2.WorkerSequence); } [Fact] public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); IEventSubscriberLease leaseA = distributor.Register(); using IEventSubscriberLease leaseB = distributor.Register(); source.Writer.TryWrite(Event(1)); _ = await ReadOneAsync(leaseA.Reader); _ = await ReadOneAsync(leaseB.Reader); leaseA.Dispose(); // A's reader must complete (no more delivery) after dispose. await AssertCompletedAsync(leaseA.Reader); // B still receives subsequent events. source.Writer.TryWrite(Event(2)); MxEvent b2 = await ReadOneAsync(leaseB.Reader); Assert.Equal(2ul, b2.WorkerSequence); } [Fact] public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease leaseA = distributor.Register(); source.Writer.TryWrite(Event(1)); _ = await ReadOneAsync(leaseA.Reader); // Late subscriber: only sees events emitted after it registered. using IEventSubscriberLease leaseB = distributor.Register(); source.Writer.TryWrite(Event(2)); MxEvent b = await ReadOneAsync(leaseB.Reader); Assert.Equal(2ul, b.WorkerSequence); } [Fact] public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump() { Channel source = Channel.CreateUnbounded(); SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease leaseA = distributor.Register(); using IEventSubscriberLease leaseB = distributor.Register(); // Bounded so a shutdown hang fails fast. await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); await AssertCompletedAsync(leaseA.Reader); await AssertCompletedAsync(leaseB.Reader); } [Fact] public async Task Register_AfterDispose_ThrowsObjectDisposedException() { Channel source = Channel.CreateUnbounded(); SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); Assert.Throws(() => distributor.Register()); } [Fact] public async Task RegisterWithReplay_AfterDispose_ThrowsObjectDisposedException() { // Pins the nested-lock disposal behavior in RegisterWithReplay: the inner // _lifecycleLock check must surface ObjectDisposedException even when the outer // _replayLock snapshot succeeds on a disposed distributor. Channel source = Channel.CreateUnbounded(); SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); Assert.Throws(() => distributor.RegisterWithReplay( 0, out _, out _, out _, out _)); } [Fact] public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 3, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); // A live subscriber forces the pump to fan (and thereby retain) each event, // and gives us a deterministic point to know the pump has processed event 5. using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 5; sequence++) { source.Writer.TryWrite(Event(sequence)); } for (ulong sequence = 1; sequence <= 5; sequence++) { MxEvent e = await ReadOneAsync(lease.Reader); Assert.Equal(sequence, e.WorkerSequence); } // Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2 // were evicted, so a caller asking from 0 missed events => gap=true, and it // gets only the retained tail. bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.True(gap); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); } [Fact] public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 5; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence 2 is still inside the retained window [1..5], so no gap and // exactly the newer events 3, 4, 5 come back. bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); } [Fact] public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses() { FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero)); Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 100, replayRetentionSeconds: 30, timeProvider: time); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); // Two old events, then advance the clock well past the retention window. source.Writer.TryWrite(Event(1)); source.Writer.TryWrite(Event(2)); _ = await ReadOneAsync(lease.Reader); _ = await ReadOneAsync(lease.Reader); time.Advance(TimeSpan.FromSeconds(60)); // A fresh event triggers age-eviction of the now-stale entries 1 and 2. source.Writer.TryWrite(Event(3)); _ = await ReadOneAsync(lease.Reader); bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); Assert.True(found); // Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained. Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence)); Assert.True(gap); } [Fact] public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence 3 is at/after the newest retained; nothing newer, and the // caller is fully caught up => empty list, gap=false. bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_Capacity0_AfterSequenceBelowHighestSeen_ReportsGap_NoEvents() { // Disabled buffer: events are tracked for the highest-seen counter but not // retained. A caller behind the highest-seen sequence must be told to re-snapshot. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 0, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence=1 is below highestSeen=3 — gap, nothing to replay. bool found = distributor.TryGetReplayFrom(1, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.True(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_Capacity0_AfterSequenceAtOrAboveHighestSeen_NoGap_NoEvents() { // Disabled buffer: caller is already caught up — no gap, nothing to replay. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 0, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence=3 equals highestSeen — caller is fully caught up. bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_NoEventsSeen_AnyAfterSequence_NoGap_NoEvents() { // No events ever seen: nothing can have been missed, so gap must be false. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 0, replayRetentionSeconds: 0); // Pump not started — no events arrive. bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_AfterSequenceMaxValue_WithRetainedEvents_NoGap_NoNewEvents() { // ulong.MaxValue as afterSequence: afterSequence + 1 would wrap to 0, which the // old code used to compare against oldestRetained, falsely reporting gap=true. // The corrected formula must yield gap=false and an empty replay list. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); source.Writer.TryWrite(Event(1)); _ = await ReadOneAsync(lease.Reader); bool found = distributor.TryGetReplayFrom(ulong.MaxValue, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task SlowSubscriberOverflow_DisconnectsOnlyThatSubscriber_PumpAndOtherKeepRunning() { // Per-subscriber backpressure isolation (Task 5): one subscriber stops reading and // overflows its own tiny channel; it is disconnected with an EventQueueOverflow fault // while a second, healthy subscriber keeps receiving and the pump keeps pumping. Channel source = Channel.CreateUnbounded(); int overflowCalls = 0; // Separate fields for the bool value and the "set" flag so both can use // Volatile.Read/Write; bool? is not valid for the volatile keyword on a local. // Interlocked.Increment on the pump thread is the store for overflowCalls; // Volatile.Read/Write provide ordering for observedIsOnlySubscriber. int observedIsOnlySubscriberSet = 0; bool observedIsOnlySubscriberValue = false; await using SessionEventDistributor distributor = new( "session-test", ct => source.Reader.ReadAllAsync(ct), subscriberQueueCapacity: 2, replayBufferCapacity: 1024, replayRetentionSeconds: 0, NullLogger.Instance, TimeProvider.System, (isOnlySubscriber, _) => { Interlocked.Increment(ref overflowCalls); Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber); Volatile.Write(ref observedIsOnlySubscriberSet, 1); }, singleSubscriberMode: false); await distributor.StartAsync(CancellationToken.None); // Slow subscriber: registered but never read, so its capacity-2 channel fills. using IEventSubscriberLease slow = distributor.Register(); // Healthy subscriber: drains promptly throughout. using IEventSubscriberLease healthy = distributor.Register(); // Push more events than the slow subscriber's channel can hold while the healthy one // keeps up. The slow channel overflows; the healthy channel does not. for (ulong sequence = 1; sequence <= 10; sequence++) { source.Writer.TryWrite(Event(sequence)); MxEvent received = await ReadOneAsync(healthy.Reader); Assert.Equal(sequence, received.WorkerSequence); } // The slow subscriber is disconnected with the overflow fault. SessionManagerException fault = await Assert.ThrowsAsync( async () => await DrainUntilFaultAsync(slow.Reader)); Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); // Multi-subscriber mode, so isOnlySubscriber is always false (Task 8 mode-gating). // Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the // pump-thread writes, avoiding a data race by the C# memory model. Assert.Equal(1, Volatile.Read(ref overflowCalls)); Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet)); Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue)); Assert.Equal(1, distributor.SubscriberCount); // The pump is still running and the healthy subscriber still receives new events. source.Writer.TryWrite(Event(11)); MxEvent afterOverflow = await ReadOneAsync(healthy.Reader); Assert.Equal(11ul, afterOverflow.WorkerSequence); } [Fact] public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving() { // Distributor-level pin for "FailFast with multiple subscribers degrades to // disconnect-only (no session fault)": in multi-subscriber mode isOnlySubscriber is // always false (Task 8 mode-gating), so a FailFast-wired handler must NOT fault the // session. This test drives the distributor directly (without GatewaySession) in // multi-subscriber mode with two subscribers and a FailFast-style overflow handler // seam, overflows the slow one, and asserts (a) isOnlySubscriber==false, (b) the other // subscriber keeps receiving, and (c) the pump keeps running. Channel source = Channel.CreateUnbounded(); bool handlerFiredWithFalse = false; bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault await using SessionEventDistributor distributor = new( "session-multi-sub", ct => source.Reader.ReadAllAsync(ct), subscriberQueueCapacity: 2, replayBufferCapacity: 0, replayRetentionSeconds: 0, NullLogger.Instance, TimeProvider.System, (isOnlySubscriber, _) => { if (!isOnlySubscriber) { // Multi-subscriber: FailFast degrades to disconnect-only. Volatile.Write(ref handlerFiredWithFalse, true); } else { // Single-subscriber: FailFast would fault the session — must not happen here. Volatile.Write(ref sessionFaultWouldBeCalled, true); } }, singleSubscriberMode: false); await distributor.StartAsync(CancellationToken.None); // Slow subscriber: never reads, so capacity-2 channel overflows quickly. using IEventSubscriberLease slow = distributor.Register(); // Healthy subscriber: drains every event promptly. using IEventSubscriberLease healthy = distributor.Register(); // Drive enough events to overflow the slow subscriber's channel. for (ulong sequence = 1; sequence <= 10; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(healthy.Reader); } // Slow subscriber is disconnected with the overflow fault. SessionManagerException fault = await Assert.ThrowsAsync( async () => await DrainUntilFaultAsync(slow.Reader)); Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); // The handler saw isOnlySubscriber==false (multi-subscriber degradation path). Assert.True(Volatile.Read(ref handlerFiredWithFalse)); // The FailFast session-fault branch was NOT taken (session stays Ready equivalent). Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled)); // The pump and healthy subscriber are unaffected. source.Writer.TryWrite(Event(11)); MxEvent afterOverflow = await ReadOneAsync(healthy.Reader); Assert.Equal(11ul, afterOverflow.WorkerSequence); } [Fact] public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal() { // Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard // subscriber, so a FailFast policy would NOT fault the session even when the internal // subscriber is the ONLY registered subscriber. The overflow handler receives // isOnlySubscriber==false (not true) because the overflowing subscriber is internal // and is therefore excluded from the external-subscriber count. Channel source = Channel.CreateUnbounded(); int observedIsOnlySubscriberSet = 0; bool observedIsOnlySubscriberValue = false; bool observedIsInternalValue = false; await using SessionEventDistributor distributor = new( "session-internal-overflow", ct => source.Reader.ReadAllAsync(ct), subscriberQueueCapacity: 2, replayBufferCapacity: 0, replayRetentionSeconds: 0, NullLogger.Instance, TimeProvider.System, (isOnlySubscriber, isInternal) => { Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber); Volatile.Write(ref observedIsInternalValue, isInternal); Volatile.Write(ref observedIsOnlySubscriberSet, 1); }); await distributor.StartAsync(CancellationToken.None); // Register ONLY an internal subscriber — no external subscriber is attached. using IEventSubscriberLease internalLease = distributor.Register(isInternal: true); // Push enough events to overflow the capacity-2 internal subscriber channel. for (ulong sequence = 1; sequence <= 10; sequence++) { source.Writer.TryWrite(Event(sequence)); } // The internal subscriber is disconnected with the overflow fault. SessionManagerException fault = await Assert.ThrowsAsync( async () => await DrainUntilFaultAsync(internalLease.Reader)); Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); // Wait for the handler to fire (it runs on the pump thread). await Task.Run(async () => { using CancellationTokenSource cts = new(ReadTimeout); while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0) { await Task.Delay(10, cts.Token); } }); // isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY // subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the // external count would NOT fault the session. Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired."); Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue), "isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it)."); Assert.True(Volatile.Read(ref observedIsInternalValue), "isInternal must be true for a subscriber registered with isInternal: true."); } [Fact] public async Task SingleSubscriberMode_LoneExternalOverflow_HandlerSeesIsOnlySubscriberTrue() { // Task 8 mode-gating: in single-subscriber mode a lone external subscriber that // overflows reports isOnlySubscriber==true, so the legacy FailFast session-fault path // is preserved. The decision is gated on the fixed session mode, NOT a live count, so // it is race-free. Channel source = Channel.CreateUnbounded(); int observedSet = 0; bool observedValue = false; await using SessionEventDistributor distributor = new( "session-single-sub", ct => source.Reader.ReadAllAsync(ct), subscriberQueueCapacity: 2, replayBufferCapacity: 0, replayRetentionSeconds: 0, NullLogger.Instance, TimeProvider.System, (isOnlySubscriber, _) => { Volatile.Write(ref observedValue, isOnlySubscriber); Volatile.Write(ref observedSet, 1); }, singleSubscriberMode: true); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease external = distributor.Register(); for (ulong sequence = 1; sequence <= 10; sequence++) { source.Writer.TryWrite(Event(sequence)); } SessionManagerException fault = await Assert.ThrowsAsync( async () => await DrainUntilFaultAsync(external.Reader)); Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); await Task.Run(async () => { using CancellationTokenSource cts = new(ReadTimeout); while (Volatile.Read(ref observedSet) == 0) { await Task.Delay(10, cts.Token); } }); // Guard: ensure the handler actually fired before asserting its observed value. // Without this the test could pass vacuously if the overflow never triggered. Assert.Equal(1, Volatile.Read(ref observedSet)); Assert.True(Volatile.Read(ref observedValue), "isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode."); } [Fact] public async Task RegisterWithReplay_WithinRetainedWindow_ReturnsNewerEvents_NoGap_ThenLive() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); // A primer subscriber forces the pump to retain events 1..5 deterministically. using IEventSubscriberLease primer = distributor.Register(); for (ulong sequence = 1; sequence <= 5; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(primer.Reader); } // Resume after sequence 2: retained window [1..5] still covers it — no gap, replay 3..5. using IEventSubscriberLease resume = distributor.RegisterWithReplay( 2, out IReadOnlyList replay, out bool gap, out ulong oldestAvailable, out ulong liveResume); Assert.False(gap); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); Assert.Equal(5ul, liveResume); // OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true). Assert.Equal(0ul, oldestAvailable); // A subsequent live event flows to the resumed subscriber's channel. source.Writer.TryWrite(Event(6)); MxEvent live = await ReadOneAsync(resume.Reader); Assert.Equal(6ul, live.WorkerSequence); } [Fact] public async Task RegisterWithReplay_BelowOldestRetained_ReportsGap_AndOldestAvailable() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 3, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease primer = distributor.Register(); for (ulong sequence = 1; sequence <= 5; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(primer.Reader); } // Capacity 3 retains 3,4,5; events 1,2 were evicted. Resume after 0 => gap, oldest=3. using IEventSubscriberLease resume = distributor.RegisterWithReplay( 0, out IReadOnlyList replay, out bool gap, out ulong oldestAvailable, out ulong liveResume); Assert.True(gap); Assert.Equal(3ul, oldestAvailable); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); Assert.Equal(5ul, liveResume); } [Fact] public async Task RegisterWithReplay_NothingRetainedNewer_LiveResumeEqualsAfterSequence_NoGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease primer = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(primer.Reader); } // Resume after 3 (newest retained): nothing newer, fully caught up — no gap, empty // replay, and the live filter resumes after the requested watermark unchanged. using IEventSubscriberLease resume = distributor.RegisterWithReplay( 3, out IReadOnlyList replay, out bool gap, out ulong oldestAvailable, out ulong liveResume); Assert.False(gap); Assert.Empty(replay); Assert.Equal(3ul, liveResume); // OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true). Assert.Equal(0ul, oldestAvailable); source.Writer.TryWrite(Event(4)); MxEvent live = await ReadOneAsync(resume.Reader); Assert.Equal(4ul, live.WorkerSequence); } private static async Task DrainUntilFaultAsync(ChannelReader reader) { // Drains any buffered events, then surfaces the channel's completion fault (if any) // by awaiting the final WaitToReadAsync past the buffered tail. // If WaitToReadAsync returns false (graceful completion rather than a fault), // await Completion to surface any fault stored there, then Assert.Fail so the // helper does not spin forever on a channel that completes without an exception. while (true) { bool hasMore = await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout); if (!hasMore) { // Graceful completion — propagate any stored exception, then fail. await reader.Completion; Assert.Fail("DrainUntilFaultAsync: channel completed gracefully (no fault)."); return; } while (reader.TryRead(out _)) { } } } /// /// Regression: a subscriber that registers in the window AFTER the pump has completed /// (its event source finished) but BEFORE the distributor is disposed must have its /// channel completed immediately, not left open forever. The pump has already run its /// final CompleteAllSubscribers sweep and exited, so without the /// register-after-completion guard the late subscriber's reader hangs indefinitely. /// This was observed as an order-dependent hang in /// GatewaySessionDashboardMirrorTests, where a gRPC subscriber attached after a /// fast-completing worker stream had already drained. /// [Fact] public async Task Register_AfterSourceCompletes_CompletesLateSubscriberInsteadOfHanging() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); // An early subscriber lets us observe when the pump's final completion sweep has run. using IEventSubscriberLease early = distributor.Register(); // Complete the source: the pump drains it, runs CompleteAllSubscribers, and exits. source.Writer.Complete(); // Draining the early subscriber to completion proves the pump finished its sweep — so // a subscriber registering now is unambiguously in the register-after-completion window. using (CancellationTokenSource earlyCts = new(ReadTimeout)) { await foreach (MxEvent _ in early.Reader.ReadAllAsync(earlyCts.Token)) { } } // Register AFTER the pump has completed. The channel must be completed immediately; the // bounded read below must end rather than hang (the ReadTimeout converts a regression // into a fast OperationCanceledException failure instead of an indefinite hang). using IEventSubscriberLease late = distributor.Register(); using CancellationTokenSource lateCts = new(ReadTimeout); await foreach (MxEvent _ in late.Reader.ReadAllAsync(lateCts.Token)) { } Assert.False(lateCts.IsCancellationRequested); } private static SessionEventDistributor CreateDistributor(ChannelReader source) => CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300); private static SessionEventDistributor CreateDistributor( ChannelReader source, int replayBufferCapacity, double replayRetentionSeconds, TimeProvider? timeProvider = null) => new( "session-test", ct => source.ReadAllAsync(ct), subscriberQueueCapacity: 64, replayBufferCapacity: replayBufferCapacity, replayRetentionSeconds: replayRetentionSeconds, NullLogger.Instance, timeProvider ?? TimeProvider.System); private static MxEvent Event(ulong sequence) => new() { SessionId = "session-test", WorkerSequence = sequence }; private static async Task ReadOneAsync(ChannelReader reader) { await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout); Assert.True(reader.TryRead(out MxEvent? value)); return value!; } private static async Task AssertCompletedAsync(ChannelReader reader) { // Drain anything still buffered, then assert the channel is completed // (no further events). Bounded so a never-completing channel fails fast. await reader.Completion.WaitAsync(ReadTimeout); } }