using System.Threading.Channels; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Time.Testing; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Sessions; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; /// /// Concurrency and fan-out tests for , the /// Session Resilience epic's per-session event pump. One pump drains the source /// exactly once and fans every event to N independent per-subscriber channels. /// Every async wait is bounded so a fan-out or shutdown deadlock fails fast. /// public sealed class SessionEventDistributorTests { private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5); [Fact] public async Task TwoSubscribers_BothReceiveFannedEventsInOrder() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease leaseA = distributor.Register(); using IEventSubscriberLease leaseB = distributor.Register(); source.Writer.TryWrite(Event(1)); source.Writer.TryWrite(Event(2)); MxEvent a1 = await ReadOneAsync(leaseA.Reader); MxEvent a2 = await ReadOneAsync(leaseA.Reader); MxEvent b1 = await ReadOneAsync(leaseB.Reader); MxEvent b2 = await ReadOneAsync(leaseB.Reader); Assert.Equal(1ul, a1.WorkerSequence); Assert.Equal(2ul, a2.WorkerSequence); Assert.Equal(1ul, b1.WorkerSequence); Assert.Equal(2ul, b2.WorkerSequence); } [Fact] public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); IEventSubscriberLease leaseA = distributor.Register(); using IEventSubscriberLease leaseB = distributor.Register(); source.Writer.TryWrite(Event(1)); _ = await ReadOneAsync(leaseA.Reader); _ = await ReadOneAsync(leaseB.Reader); leaseA.Dispose(); // A's reader must complete (no more delivery) after dispose. await AssertCompletedAsync(leaseA.Reader); // B still receives subsequent events. source.Writer.TryWrite(Event(2)); MxEvent b2 = await ReadOneAsync(leaseB.Reader); Assert.Equal(2ul, b2.WorkerSequence); } [Fact] public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease leaseA = distributor.Register(); source.Writer.TryWrite(Event(1)); _ = await ReadOneAsync(leaseA.Reader); // Late subscriber: only sees events emitted after it registered. using IEventSubscriberLease leaseB = distributor.Register(); source.Writer.TryWrite(Event(2)); MxEvent b = await ReadOneAsync(leaseB.Reader); Assert.Equal(2ul, b.WorkerSequence); } [Fact] public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump() { Channel source = Channel.CreateUnbounded(); SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease leaseA = distributor.Register(); using IEventSubscriberLease leaseB = distributor.Register(); // Bounded so a shutdown hang fails fast. await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); await AssertCompletedAsync(leaseA.Reader); await AssertCompletedAsync(leaseB.Reader); } [Fact] public async Task Register_AfterDispose_ThrowsObjectDisposedException() { Channel source = Channel.CreateUnbounded(); SessionEventDistributor distributor = CreateDistributor(source.Reader); await distributor.StartAsync(CancellationToken.None); await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); Assert.Throws(() => distributor.Register()); } [Fact] public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 3, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); // A live subscriber forces the pump to fan (and thereby retain) each event, // and gives us a deterministic point to know the pump has processed event 5. using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 5; sequence++) { source.Writer.TryWrite(Event(sequence)); } for (ulong sequence = 1; sequence <= 5; sequence++) { MxEvent e = await ReadOneAsync(lease.Reader); Assert.Equal(sequence, e.WorkerSequence); } // Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2 // were evicted, so a caller asking from 0 missed events => gap=true, and it // gets only the retained tail. bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.True(gap); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); } [Fact] public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 5; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence 2 is still inside the retained window [1..5], so no gap and // exactly the newer events 3, 4, 5 come back. bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); } [Fact] public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses() { FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero)); Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 100, replayRetentionSeconds: 30, timeProvider: time); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); // Two old events, then advance the clock well past the retention window. source.Writer.TryWrite(Event(1)); source.Writer.TryWrite(Event(2)); _ = await ReadOneAsync(lease.Reader); _ = await ReadOneAsync(lease.Reader); time.Advance(TimeSpan.FromSeconds(60)); // A fresh event triggers age-eviction of the now-stale entries 1 and 2. source.Writer.TryWrite(Event(3)); _ = await ReadOneAsync(lease.Reader); bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); Assert.True(found); // Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained. Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence)); Assert.True(gap); } [Fact] public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap() { Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence 3 is at/after the newest retained; nothing newer, and the // caller is fully caught up => empty list, gap=false. bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_Capacity0_AfterSequenceBelowHighestSeen_ReportsGap_NoEvents() { // Disabled buffer: events are tracked for the highest-seen counter but not // retained. A caller behind the highest-seen sequence must be told to re-snapshot. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 0, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence=1 is below highestSeen=3 — gap, nothing to replay. bool found = distributor.TryGetReplayFrom(1, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.True(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_Capacity0_AfterSequenceAtOrAboveHighestSeen_NoGap_NoEvents() { // Disabled buffer: caller is already caught up — no gap, nothing to replay. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 0, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); for (ulong sequence = 1; sequence <= 3; sequence++) { source.Writer.TryWrite(Event(sequence)); _ = await ReadOneAsync(lease.Reader); } // afterSequence=3 equals highestSeen — caller is fully caught up. bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_NoEventsSeen_AnyAfterSequence_NoGap_NoEvents() { // No events ever seen: nothing can have been missed, so gap must be false. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 0, replayRetentionSeconds: 0); // Pump not started — no events arrive. bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } [Fact] public async Task ReplayBuffer_AfterSequenceMaxValue_WithRetainedEvents_NoGap_NoNewEvents() { // ulong.MaxValue as afterSequence: afterSequence + 1 would wrap to 0, which the // old code used to compare against oldestRetained, falsely reporting gap=true. // The corrected formula must yield gap=false and an empty replay list. Channel source = Channel.CreateUnbounded(); await using SessionEventDistributor distributor = CreateDistributor( source.Reader, replayBufferCapacity: 10, replayRetentionSeconds: 0); await distributor.StartAsync(CancellationToken.None); using IEventSubscriberLease lease = distributor.Register(); source.Writer.TryWrite(Event(1)); _ = await ReadOneAsync(lease.Reader); bool found = distributor.TryGetReplayFrom(ulong.MaxValue, out IReadOnlyList replay, out bool gap); Assert.True(found); Assert.False(gap); Assert.Empty(replay); } private static SessionEventDistributor CreateDistributor(ChannelReader source) => CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300); private static SessionEventDistributor CreateDistributor( ChannelReader source, int replayBufferCapacity, double replayRetentionSeconds, TimeProvider? timeProvider = null) => new( "session-test", ct => source.ReadAllAsync(ct), subscriberQueueCapacity: 64, replayBufferCapacity: replayBufferCapacity, replayRetentionSeconds: replayRetentionSeconds, NullLogger.Instance, timeProvider ?? TimeProvider.System); private static MxEvent Event(ulong sequence) => new() { SessionId = "session-test", WorkerSequence = sequence }; private static async Task ReadOneAsync(ChannelReader reader) { await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout); Assert.True(reader.TryRead(out MxEvent? value)); return value!; } private static async Task AssertCompletedAsync(ChannelReader reader) { // Drain anything still buffered, then assert the channel is completed // (no further events). Bounded so a never-completing channel fails fast. await reader.Completion.WaitAsync(ReadTimeout); } }