From c2c518862fbf8dd018937d2b791d9c7e75160ee1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 12:48:11 -0400 Subject: [PATCH] fix(sessions): replay-buffer gap edge cases, effective-config exposure, capacity-0 tests #2: Replace afterSequence+10&&afterSequence to describe the Task 3 replay ring buffer (capacity + age eviction, TryGetReplayFrom) rather than its absence. #6: Add O(n)-is-acceptable comment at TryGetReplayFrom linear scan. #8: Narrow no-replay 4-arg ctor to internal; InternalsVisibleTo already covers the test project. --- .../EffectiveEventConfiguration.cs | 4 +- .../GatewayConfigurationProvider.cs | 4 +- .../Sessions/SessionEventDistributor.cs | 25 +++-- .../Sessions/SessionEventDistributorTests.cs | 95 +++++++++++++++++++ 4 files changed, 119 insertions(+), 9 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveEventConfiguration.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveEventConfiguration.cs index 6e0bee5..68136a5 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveEventConfiguration.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EffectiveEventConfiguration.cs @@ -2,4 +2,6 @@ namespace ZB.MOM.WW.MxGateway.Server.Configuration; public sealed record EffectiveEventConfiguration( int QueueCapacity, - string BackpressurePolicy); + string BackpressurePolicy, + int ReplayBufferCapacity, + double ReplayRetentionSeconds); diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs index b492e60..dccb834 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayConfigurationProvider.cs @@ -49,7 +49,9 @@ public sealed class GatewayConfigurationProvider(IOptions option AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers), Events: new EffectiveEventConfiguration( QueueCapacity: value.Events.QueueCapacity, - BackpressurePolicy: value.Events.BackpressurePolicy.ToString()), + BackpressurePolicy: value.Events.BackpressurePolicy.ToString(), + ReplayBufferCapacity: value.Events.ReplayBufferCapacity, + ReplayRetentionSeconds: value.Events.ReplayRetentionSeconds), Dashboard: new EffectiveDashboardConfiguration( Enabled: value.Dashboard.Enabled, AllowAnonymousLocalhost: value.Dashboard.AllowAnonymousLocalhost, diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 8cb55ce..e3746e6 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -11,11 +11,15 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions; /// /// /// -/// This is the skeleton introduced by Task 2 of the Session Resilience epic. -/// It is a standalone class — it is NOT yet wired into GatewaySession or -/// EventStreamService (Task 4), it has no replay ring buffer (Task 3), -/// no per-subscriber backpressure-isolation policy (Task 5), and it does not -/// remove the single-subscriber guard (Tasks 7/8). +/// Introduced by Task 2 of the Session Resilience epic; the bounded replay ring +/// buffer was added by Task 3. The class is NOT yet wired into +/// GatewaySession or EventStreamService (Task 4), has no +/// per-subscriber backpressure-isolation policy (Task 5), and does not remove +/// the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity +/// eviction (oldest entry dropped when the count exceeds +/// replayBufferCapacity) and age eviction (entries older than +/// replayRetentionSeconds dropped on the next append or query), and is +/// queried via by reconnecting subscribers. /// /// /// Source seam. The event source is injected as a @@ -95,8 +99,10 @@ public sealed class SessionEventDistributor : IAsyncDisposable /// /// This overload disables the replay ring buffer (capacity 0). Use the overload /// taking replay parameters to retain events for reconnect/reattach replay. + /// Kept internal so production wiring (Task 4) cannot accidentally use + /// the no-replay path; tests reach it via InternalsVisibleTo. /// - public SessionEventDistributor( + internal SessionEventDistributor( string sessionId, Func> eventSourceFactory, int subscriberQueueCapacity, @@ -431,8 +437,13 @@ public sealed class SessionEventDistributor : IAsyncDisposable // A gap exists when at least one event newer than afterSequence was evicted, // i.e. afterSequence sits below the oldest-retained-minus-one boundary. - gap = afterSequence + 1 < oldestRetained; + // Written as (oldestRetained > 0 && afterSequence < oldestRetained - 1) to + // avoid wrapping when afterSequence == ulong.MaxValue (afterSequence + 1 + // would overflow to 0, falsely reporting a gap). + gap = oldestRetained > 0 && afterSequence < oldestRetained - 1; + // O(n) scan over the retained buffer — acceptable because TryGetReplayFrom + // is only called on subscriber reconnect, never on the hot fan-out path. List newer = []; foreach (ReplayEntry entry in _replayBuffer) { diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 7df4273..f6cb53a 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -233,6 +233,101 @@ public sealed class SessionEventDistributorTests Assert.Empty(replay); } + [Fact] + public async Task ReplayBuffer_Capacity0_AfterSequenceBelowHighestSeen_ReportsGap_NoEvents() + { + // Disabled buffer: events are tracked for the highest-seen counter but not + // retained. A caller behind the highest-seen sequence must be told to re-snapshot. + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 0, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease lease = distributor.Register(); + for (ulong sequence = 1; sequence <= 3; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(lease.Reader); + } + + // afterSequence=1 is below highestSeen=3 — gap, nothing to replay. + bool found = distributor.TryGetReplayFrom(1, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.True(gap); + Assert.Empty(replay); + } + + [Fact] + public async Task ReplayBuffer_Capacity0_AfterSequenceAtOrAboveHighestSeen_NoGap_NoEvents() + { + // Disabled buffer: caller is already caught up — no gap, nothing to replay. + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 0, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease lease = distributor.Register(); + for (ulong sequence = 1; sequence <= 3; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(lease.Reader); + } + + // afterSequence=3 equals highestSeen — caller is fully caught up. + bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.False(gap); + Assert.Empty(replay); + } + + [Fact] + public async Task ReplayBuffer_NoEventsSeen_AnyAfterSequence_NoGap_NoEvents() + { + // No events ever seen: nothing can have been missed, so gap must be false. + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 0, + replayRetentionSeconds: 0); + // Pump not started — no events arrive. + + bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.False(gap); + Assert.Empty(replay); + } + + [Fact] + public async Task ReplayBuffer_AfterSequenceMaxValue_WithRetainedEvents_NoGap_NoNewEvents() + { + // ulong.MaxValue as afterSequence: afterSequence + 1 would wrap to 0, which the + // old code used to compare against oldestRetained, falsely reporting gap=true. + // The corrected formula must yield gap=false and an empty replay list. + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 10, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease lease = distributor.Register(); + source.Writer.TryWrite(Event(1)); + _ = await ReadOneAsync(lease.Reader); + + bool found = distributor.TryGetReplayFrom(ulong.MaxValue, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.False(gap); + Assert.Empty(replay); + } + private static SessionEventDistributor CreateDistributor(ChannelReader source) => CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);