From c7a7cd1e5e30891c71165b6c86c6cd961d1f8389 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 07:28:37 -0400 Subject: [PATCH] fix(sessions): tidy replay filter/comment; zero OldestAvailableSequence when no gap - EventStreamService: remove dead per-item sequence guard in the replay loop (RegisterWithReplay already returns only events > afterSequence) and correct the comment that falsely claimed a "per-item constraint filter" is applied; the event stream has no per-event constraint filtering today. - SessionEventDistributor.RegisterWithReplay: set oldestAvailableSequence=0 when gap==false so the implementation matches the documented contract (OldestAvailableSequence is meaningful only when Gap is true). Update the two RegisterWithReplay tests that asserted the old non-zero value in the no-gap path. - RegisterSubscriber: remove stray blank line at method entry. - SessionEventDistributorTests: add RegisterWithReplay_AfterDispose_ ThrowsObjectDisposedException to pin nested-lock disposal behavior. --- .../Grpc/EventStreamService.cs | 16 +++++------ .../Sessions/SessionEventDistributor.cs | 9 ++++--- .../Sessions/SessionEventDistributorTests.cs | 27 +++++++++++++++++-- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index 8b7ee51..d238ef4 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -132,16 +132,12 @@ public sealed class EventStreamService( foreach (MxEvent replayedEvent in replayedEvents) { - // Replayed events pass through the SAME per-item filter the live loop applies, - // so a constrained/resuming caller never sees a replayed event it could not - // have seen live. The watermark dropped events at/below the requested - // AfterWorkerSequence; the snapshot already excluded those, but this keeps the - // filter identical for replay and live. - if (replayedEvent.WorkerSequence <= request.AfterWorkerSequence) - { - continue; - } - + // RegisterWithReplay already returns only events strictly newer than + // AfterWorkerSequence, so no per-item sequence guard is needed here. + // There is no per-event constraint filter on the event stream: events are + // fanned as-is by the distributor pump. The only dedup watermark is the + // LiveResumeSequence applied in the live loop below (to drop any event + // that was both replayed and raced into the live channel). yield return replayedEvent; } diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index d2f3e77..2b37ddf 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -295,7 +295,6 @@ public sealed class SessionEventDistributor : IAsyncDisposable private IEventSubscriberLease RegisterSubscriber(Subscriber subscriber) { - // The disposed check AND the map add happen under the same lock with no await // in between. DisposeAsync sets _disposed=true under this same lock before it // calls CompleteAllSubscribers, so once disposal has begun no further subscriber @@ -424,13 +423,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable if (_replayBuffer.Count == 0) { - oldestAvailableSequence = 0; gap = _anyEventSeen && afterSequence < _highestSequenceSeen; + oldestAvailableSequence = 0; // meaningful only when gap == true; 0 here since nothing is retained } else { - oldestAvailableSequence = _replayBuffer.First!.Value.Event.WorkerSequence; - gap = oldestAvailableSequence > 0 && afterSequence < oldestAvailableSequence - 1; + ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence; + gap = oldestRetained > 0 && afterSequence < oldestRetained - 1; + // Per the contract on OldestAvailableSequence: meaningful only when gap == true. + oldestAvailableSequence = gap ? oldestRetained : 0; foreach (ReplayEntry entry in _replayBuffer) { diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 1f3dc65..2cb8152 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -113,6 +113,27 @@ public sealed class SessionEventDistributorTests Assert.Throws(() => distributor.Register()); } + [Fact] + public async Task RegisterWithReplay_AfterDispose_ThrowsObjectDisposedException() + { + // Pins the nested-lock disposal behavior in RegisterWithReplay: the inner + // _lifecycleLock check must surface ObjectDisposedException even when the outer + // _replayLock snapshot succeeds on a disposed distributor. + Channel source = Channel.CreateUnbounded(); + SessionEventDistributor distributor = CreateDistributor(source.Reader); + await distributor.StartAsync(CancellationToken.None); + + await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout); + + Assert.Throws(() => + distributor.RegisterWithReplay( + 0, + out _, + out _, + out _, + out _)); + } + [Fact] public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap() { @@ -601,7 +622,8 @@ public sealed class SessionEventDistributorTests Assert.False(gap); Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); Assert.Equal(5ul, liveResume); - Assert.Equal(1ul, oldestAvailable); + // OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true). + Assert.Equal(0ul, oldestAvailable); // A subsequent live event flows to the resumed subscriber's channel. source.Writer.TryWrite(Event(6)); @@ -669,7 +691,8 @@ public sealed class SessionEventDistributorTests Assert.False(gap); Assert.Empty(replay); Assert.Equal(3ul, liveResume); - Assert.Equal(1ul, oldestAvailable); + // OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true). + Assert.Equal(0ul, oldestAvailable); source.Writer.TryWrite(Event(4)); MxEvent live = await ReadOneAsync(resume.Reader);