fix(sessions): tidy replay filter/comment; zero OldestAvailableSequence when no gap
- EventStreamService: remove dead per-item sequence guard in the replay loop (RegisterWithReplay already returns only events > afterSequence) and correct the comment that falsely claimed a "per-item constraint filter" is applied; the event stream has no per-event constraint filtering today. - SessionEventDistributor.RegisterWithReplay: set oldestAvailableSequence=0 when gap==false so the implementation matches the documented contract (OldestAvailableSequence is meaningful only when Gap is true). Update the two RegisterWithReplay tests that asserted the old non-zero value in the no-gap path. - RegisterSubscriber: remove stray blank line at method entry. - SessionEventDistributorTests: add RegisterWithReplay_AfterDispose_ ThrowsObjectDisposedException to pin nested-lock disposal behavior.
This commit is contained in:
@@ -132,16 +132,12 @@ public sealed class EventStreamService(
|
||||
|
||||
foreach (MxEvent replayedEvent in replayedEvents)
|
||||
{
|
||||
// Replayed events pass through the SAME per-item filter the live loop applies,
|
||||
// so a constrained/resuming caller never sees a replayed event it could not
|
||||
// have seen live. The watermark dropped events at/below the requested
|
||||
// AfterWorkerSequence; the snapshot already excluded those, but this keeps the
|
||||
// filter identical for replay and live.
|
||||
if (replayedEvent.WorkerSequence <= request.AfterWorkerSequence)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// RegisterWithReplay already returns only events strictly newer than
|
||||
// AfterWorkerSequence, so no per-item sequence guard is needed here.
|
||||
// There is no per-event constraint filter on the event stream: events are
|
||||
// fanned as-is by the distributor pump. The only dedup watermark is the
|
||||
// LiveResumeSequence applied in the live loop below (to drop any event
|
||||
// that was both replayed and raced into the live channel).
|
||||
yield return replayedEvent;
|
||||
}
|
||||
|
||||
|
||||
@@ -295,7 +295,6 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
||||
|
||||
private IEventSubscriberLease RegisterSubscriber(Subscriber subscriber)
|
||||
{
|
||||
|
||||
// The disposed check AND the map add happen under the same lock with no await
|
||||
// in between. DisposeAsync sets _disposed=true under this same lock before it
|
||||
// calls CompleteAllSubscribers, so once disposal has begun no further subscriber
|
||||
@@ -424,13 +423,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
||||
|
||||
if (_replayBuffer.Count == 0)
|
||||
{
|
||||
oldestAvailableSequence = 0;
|
||||
gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
|
||||
oldestAvailableSequence = 0; // meaningful only when gap == true; 0 here since nothing is retained
|
||||
}
|
||||
else
|
||||
{
|
||||
oldestAvailableSequence = _replayBuffer.First!.Value.Event.WorkerSequence;
|
||||
gap = oldestAvailableSequence > 0 && afterSequence < oldestAvailableSequence - 1;
|
||||
ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
|
||||
gap = oldestRetained > 0 && afterSequence < oldestRetained - 1;
|
||||
// Per the contract on OldestAvailableSequence: meaningful only when gap == true.
|
||||
oldestAvailableSequence = gap ? oldestRetained : 0;
|
||||
|
||||
foreach (ReplayEntry entry in _replayBuffer)
|
||||
{
|
||||
|
||||
@@ -113,6 +113,27 @@ public sealed class SessionEventDistributorTests
|
||||
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RegisterWithReplay_AfterDispose_ThrowsObjectDisposedException()
|
||||
{
|
||||
// Pins the nested-lock disposal behavior in RegisterWithReplay: the inner
|
||||
// _lifecycleLock check must surface ObjectDisposedException even when the outer
|
||||
// _replayLock snapshot succeeds on a disposed distributor.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
|
||||
|
||||
Assert.Throws<ObjectDisposedException>(() =>
|
||||
distributor.RegisterWithReplay(
|
||||
0,
|
||||
out _,
|
||||
out _,
|
||||
out _,
|
||||
out _));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
|
||||
{
|
||||
@@ -601,7 +622,8 @@ public sealed class SessionEventDistributorTests
|
||||
Assert.False(gap);
|
||||
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
||||
Assert.Equal(5ul, liveResume);
|
||||
Assert.Equal(1ul, oldestAvailable);
|
||||
// OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true).
|
||||
Assert.Equal(0ul, oldestAvailable);
|
||||
|
||||
// A subsequent live event flows to the resumed subscriber's channel.
|
||||
source.Writer.TryWrite(Event(6));
|
||||
@@ -669,7 +691,8 @@ public sealed class SessionEventDistributorTests
|
||||
Assert.False(gap);
|
||||
Assert.Empty(replay);
|
||||
Assert.Equal(3ul, liveResume);
|
||||
Assert.Equal(1ul, oldestAvailable);
|
||||
// OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true).
|
||||
Assert.Equal(0ul, oldestAvailable);
|
||||
|
||||
source.Writer.TryWrite(Event(4));
|
||||
MxEvent live = await ReadOneAsync(resume.Reader);
|
||||
|
||||
Reference in New Issue
Block a user