c7a7cd1e5e
- EventStreamService: remove dead per-item sequence guard in the replay loop (RegisterWithReplay already returns only events > afterSequence) and correct the comment that falsely claimed a "per-item constraint filter" is applied; the event stream has no per-event constraint filtering today. - SessionEventDistributor.RegisterWithReplay: set oldestAvailableSequence=0 when gap==false so the implementation matches the documented contract (OldestAvailableSequence is meaningful only when Gap is true). Update the two RegisterWithReplay tests that asserted the old non-zero value in the no-gap path. - RegisterSubscriber: remove stray blank line at method entry. - SessionEventDistributorTests: add RegisterWithReplay_AfterDispose_ ThrowsObjectDisposedException to pin nested-lock disposal behavior.
749 lines
32 KiB
C#
749 lines
32 KiB
C#
using System.Threading.Channels;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Time.Testing;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
|
|
|
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
|
|
|
|
/// <summary>
|
|
/// Concurrency and fan-out tests for <see cref="SessionEventDistributor"/>, the
|
|
/// Session Resilience epic's per-session event pump. One pump drains the source
|
|
/// exactly once and fans every event to N independent per-subscriber channels.
|
|
/// Every async wait is bounded so a fan-out or shutdown deadlock fails fast.
|
|
/// </summary>
|
|
public sealed class SessionEventDistributorTests
|
|
{
|
|
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
|
|
|
|
[Fact]
|
|
public async Task TwoSubscribers_BothReceiveFannedEventsInOrder()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease leaseA = distributor.Register();
|
|
using IEventSubscriberLease leaseB = distributor.Register();
|
|
|
|
source.Writer.TryWrite(Event(1));
|
|
source.Writer.TryWrite(Event(2));
|
|
|
|
MxEvent a1 = await ReadOneAsync(leaseA.Reader);
|
|
MxEvent a2 = await ReadOneAsync(leaseA.Reader);
|
|
MxEvent b1 = await ReadOneAsync(leaseB.Reader);
|
|
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
|
|
|
|
Assert.Equal(1ul, a1.WorkerSequence);
|
|
Assert.Equal(2ul, a2.WorkerSequence);
|
|
Assert.Equal(1ul, b1.WorkerSequence);
|
|
Assert.Equal(2ul, b2.WorkerSequence);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
IEventSubscriberLease leaseA = distributor.Register();
|
|
using IEventSubscriberLease leaseB = distributor.Register();
|
|
|
|
source.Writer.TryWrite(Event(1));
|
|
_ = await ReadOneAsync(leaseA.Reader);
|
|
_ = await ReadOneAsync(leaseB.Reader);
|
|
|
|
leaseA.Dispose();
|
|
|
|
// A's reader must complete (no more delivery) after dispose.
|
|
await AssertCompletedAsync(leaseA.Reader);
|
|
|
|
// B still receives subsequent events.
|
|
source.Writer.TryWrite(Event(2));
|
|
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
|
|
Assert.Equal(2ul, b2.WorkerSequence);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease leaseA = distributor.Register();
|
|
source.Writer.TryWrite(Event(1));
|
|
_ = await ReadOneAsync(leaseA.Reader);
|
|
|
|
// Late subscriber: only sees events emitted after it registered.
|
|
using IEventSubscriberLease leaseB = distributor.Register();
|
|
source.Writer.TryWrite(Event(2));
|
|
|
|
MxEvent b = await ReadOneAsync(leaseB.Reader);
|
|
Assert.Equal(2ul, b.WorkerSequence);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease leaseA = distributor.Register();
|
|
using IEventSubscriberLease leaseB = distributor.Register();
|
|
|
|
// Bounded so a shutdown hang fails fast.
|
|
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
|
|
|
|
await AssertCompletedAsync(leaseA.Reader);
|
|
await AssertCompletedAsync(leaseB.Reader);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Register_AfterDispose_ThrowsObjectDisposedException()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
|
|
|
|
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RegisterWithReplay_AfterDispose_ThrowsObjectDisposedException()
|
|
{
|
|
// Pins the nested-lock disposal behavior in RegisterWithReplay: the inner
|
|
// _lifecycleLock check must surface ObjectDisposedException even when the outer
|
|
// _replayLock snapshot succeeds on a disposed distributor.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
|
|
|
|
Assert.Throws<ObjectDisposedException>(() =>
|
|
distributor.RegisterWithReplay(
|
|
0,
|
|
out _,
|
|
out _,
|
|
out _,
|
|
out _));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 3,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
// A live subscriber forces the pump to fan (and thereby retain) each event,
|
|
// and gives us a deterministic point to know the pump has processed event 5.
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
}
|
|
|
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
|
{
|
|
MxEvent e = await ReadOneAsync(lease.Reader);
|
|
Assert.Equal(sequence, e.WorkerSequence);
|
|
}
|
|
|
|
// Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2
|
|
// were evicted, so a caller asking from 0 missed events => gap=true, and it
|
|
// gets only the retained tail.
|
|
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.True(gap);
|
|
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 10,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
}
|
|
|
|
// afterSequence 2 is still inside the retained window [1..5], so no gap and
|
|
// exactly the newer events 3, 4, 5 come back.
|
|
bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.False(gap);
|
|
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses()
|
|
{
|
|
FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero));
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 100,
|
|
replayRetentionSeconds: 30,
|
|
timeProvider: time);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
|
|
// Two old events, then advance the clock well past the retention window.
|
|
source.Writer.TryWrite(Event(1));
|
|
source.Writer.TryWrite(Event(2));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
|
|
time.Advance(TimeSpan.FromSeconds(60));
|
|
|
|
// A fresh event triggers age-eviction of the now-stale entries 1 and 2.
|
|
source.Writer.TryWrite(Event(3));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
|
|
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
// Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained.
|
|
Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence));
|
|
Assert.True(gap);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 10,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 3; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
}
|
|
|
|
// afterSequence 3 is at/after the newest retained; nothing newer, and the
|
|
// caller is fully caught up => empty list, gap=false.
|
|
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.False(gap);
|
|
Assert.Empty(replay);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_Capacity0_AfterSequenceBelowHighestSeen_ReportsGap_NoEvents()
|
|
{
|
|
// Disabled buffer: events are tracked for the highest-seen counter but not
|
|
// retained. A caller behind the highest-seen sequence must be told to re-snapshot.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 0,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 3; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
}
|
|
|
|
// afterSequence=1 is below highestSeen=3 — gap, nothing to replay.
|
|
bool found = distributor.TryGetReplayFrom(1, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.True(gap);
|
|
Assert.Empty(replay);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_Capacity0_AfterSequenceAtOrAboveHighestSeen_NoGap_NoEvents()
|
|
{
|
|
// Disabled buffer: caller is already caught up — no gap, nothing to replay.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 0,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 3; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
}
|
|
|
|
// afterSequence=3 equals highestSeen — caller is fully caught up.
|
|
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.False(gap);
|
|
Assert.Empty(replay);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_NoEventsSeen_AnyAfterSequence_NoGap_NoEvents()
|
|
{
|
|
// No events ever seen: nothing can have been missed, so gap must be false.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 0,
|
|
replayRetentionSeconds: 0);
|
|
// Pump not started — no events arrive.
|
|
|
|
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.False(gap);
|
|
Assert.Empty(replay);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReplayBuffer_AfterSequenceMaxValue_WithRetainedEvents_NoGap_NoNewEvents()
|
|
{
|
|
// ulong.MaxValue as afterSequence: afterSequence + 1 would wrap to 0, which the
|
|
// old code used to compare against oldestRetained, falsely reporting gap=true.
|
|
// The corrected formula must yield gap=false and an empty replay list.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 10,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease lease = distributor.Register();
|
|
source.Writer.TryWrite(Event(1));
|
|
_ = await ReadOneAsync(lease.Reader);
|
|
|
|
bool found = distributor.TryGetReplayFrom(ulong.MaxValue, out IReadOnlyList<MxEvent> replay, out bool gap);
|
|
|
|
Assert.True(found);
|
|
Assert.False(gap);
|
|
Assert.Empty(replay);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SlowSubscriberOverflow_DisconnectsOnlyThatSubscriber_PumpAndOtherKeepRunning()
|
|
{
|
|
// Per-subscriber backpressure isolation (Task 5): one subscriber stops reading and
|
|
// overflows its own tiny channel; it is disconnected with an EventQueueOverflow fault
|
|
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
int overflowCalls = 0;
|
|
// Separate fields for the bool value and the "set" flag so both can use
|
|
// Volatile.Read/Write; bool? is not valid for the volatile keyword on a local.
|
|
// Interlocked.Increment on the pump thread is the store for overflowCalls;
|
|
// Volatile.Read/Write provide ordering for observedIsOnlySubscriber.
|
|
int observedIsOnlySubscriberSet = 0;
|
|
bool observedIsOnlySubscriberValue = false;
|
|
await using SessionEventDistributor distributor = new(
|
|
"session-test",
|
|
ct => source.Reader.ReadAllAsync(ct),
|
|
subscriberQueueCapacity: 2,
|
|
replayBufferCapacity: 1024,
|
|
replayRetentionSeconds: 0,
|
|
NullLogger<SessionEventDistributor>.Instance,
|
|
TimeProvider.System,
|
|
(isOnlySubscriber, _) =>
|
|
{
|
|
Interlocked.Increment(ref overflowCalls);
|
|
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
|
|
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
|
|
},
|
|
singleSubscriberMode: false);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
// Slow subscriber: registered but never read, so its capacity-2 channel fills.
|
|
using IEventSubscriberLease slow = distributor.Register();
|
|
// Healthy subscriber: drains promptly throughout.
|
|
using IEventSubscriberLease healthy = distributor.Register();
|
|
|
|
// Push more events than the slow subscriber's channel can hold while the healthy one
|
|
// keeps up. The slow channel overflows; the healthy channel does not.
|
|
for (ulong sequence = 1; sequence <= 10; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
MxEvent received = await ReadOneAsync(healthy.Reader);
|
|
Assert.Equal(sequence, received.WorkerSequence);
|
|
}
|
|
|
|
// The slow subscriber is disconnected with the overflow fault.
|
|
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
|
async () => await DrainUntilFaultAsync(slow.Reader));
|
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
|
|
|
// Multi-subscriber mode, so isOnlySubscriber is always false (Task 8 mode-gating).
|
|
// Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
|
|
// pump-thread writes, avoiding a data race by the C# memory model.
|
|
Assert.Equal(1, Volatile.Read(ref overflowCalls));
|
|
Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet));
|
|
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue));
|
|
Assert.Equal(1, distributor.SubscriberCount);
|
|
|
|
// The pump is still running and the healthy subscriber still receives new events.
|
|
source.Writer.TryWrite(Event(11));
|
|
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
|
|
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
|
|
{
|
|
// Distributor-level pin for "FailFast with multiple subscribers degrades to
|
|
// disconnect-only (no session fault)": in multi-subscriber mode isOnlySubscriber is
|
|
// always false (Task 8 mode-gating), so a FailFast-wired handler must NOT fault the
|
|
// session. This test drives the distributor directly (without GatewaySession) in
|
|
// multi-subscriber mode with two subscribers and a FailFast-style overflow handler
|
|
// seam, overflows the slow one, and asserts (a) isOnlySubscriber==false, (b) the other
|
|
// subscriber keeps receiving, and (c) the pump keeps running.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
bool handlerFiredWithFalse = false;
|
|
bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
|
|
await using SessionEventDistributor distributor = new(
|
|
"session-multi-sub",
|
|
ct => source.Reader.ReadAllAsync(ct),
|
|
subscriberQueueCapacity: 2,
|
|
replayBufferCapacity: 0,
|
|
replayRetentionSeconds: 0,
|
|
NullLogger<SessionEventDistributor>.Instance,
|
|
TimeProvider.System,
|
|
(isOnlySubscriber, _) =>
|
|
{
|
|
if (!isOnlySubscriber)
|
|
{
|
|
// Multi-subscriber: FailFast degrades to disconnect-only.
|
|
Volatile.Write(ref handlerFiredWithFalse, true);
|
|
}
|
|
else
|
|
{
|
|
// Single-subscriber: FailFast would fault the session — must not happen here.
|
|
Volatile.Write(ref sessionFaultWouldBeCalled, true);
|
|
}
|
|
},
|
|
singleSubscriberMode: false);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
// Slow subscriber: never reads, so capacity-2 channel overflows quickly.
|
|
using IEventSubscriberLease slow = distributor.Register();
|
|
// Healthy subscriber: drains every event promptly.
|
|
using IEventSubscriberLease healthy = distributor.Register();
|
|
|
|
// Drive enough events to overflow the slow subscriber's channel.
|
|
for (ulong sequence = 1; sequence <= 10; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(healthy.Reader);
|
|
}
|
|
|
|
// Slow subscriber is disconnected with the overflow fault.
|
|
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
|
async () => await DrainUntilFaultAsync(slow.Reader));
|
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
|
|
|
// The handler saw isOnlySubscriber==false (multi-subscriber degradation path).
|
|
Assert.True(Volatile.Read(ref handlerFiredWithFalse));
|
|
// The FailFast session-fault branch was NOT taken (session stays Ready equivalent).
|
|
Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled));
|
|
|
|
// The pump and healthy subscriber are unaffected.
|
|
source.Writer.TryWrite(Event(11));
|
|
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
|
|
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal()
|
|
{
|
|
// Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard
|
|
// subscriber, so a FailFast policy would NOT fault the session even when the internal
|
|
// subscriber is the ONLY registered subscriber. The overflow handler receives
|
|
// isOnlySubscriber==false (not true) because the overflowing subscriber is internal
|
|
// and is therefore excluded from the external-subscriber count.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
int observedIsOnlySubscriberSet = 0;
|
|
bool observedIsOnlySubscriberValue = false;
|
|
bool observedIsInternalValue = false;
|
|
await using SessionEventDistributor distributor = new(
|
|
"session-internal-overflow",
|
|
ct => source.Reader.ReadAllAsync(ct),
|
|
subscriberQueueCapacity: 2,
|
|
replayBufferCapacity: 0,
|
|
replayRetentionSeconds: 0,
|
|
NullLogger<SessionEventDistributor>.Instance,
|
|
TimeProvider.System,
|
|
(isOnlySubscriber, isInternal) =>
|
|
{
|
|
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
|
|
Volatile.Write(ref observedIsInternalValue, isInternal);
|
|
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
|
|
});
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
// Register ONLY an internal subscriber — no external subscriber is attached.
|
|
using IEventSubscriberLease internalLease = distributor.Register(isInternal: true);
|
|
|
|
// Push enough events to overflow the capacity-2 internal subscriber channel.
|
|
for (ulong sequence = 1; sequence <= 10; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
}
|
|
|
|
// The internal subscriber is disconnected with the overflow fault.
|
|
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
|
async () => await DrainUntilFaultAsync(internalLease.Reader));
|
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
|
|
|
// Wait for the handler to fire (it runs on the pump thread).
|
|
await Task.Run(async () =>
|
|
{
|
|
using CancellationTokenSource cts = new(ReadTimeout);
|
|
while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0)
|
|
{
|
|
await Task.Delay(10, cts.Token);
|
|
}
|
|
});
|
|
|
|
// isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY
|
|
// subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the
|
|
// external count would NOT fault the session.
|
|
Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired.");
|
|
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue),
|
|
"isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it).");
|
|
Assert.True(Volatile.Read(ref observedIsInternalValue),
|
|
"isInternal must be true for a subscriber registered with isInternal: true.");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SingleSubscriberMode_LoneExternalOverflow_HandlerSeesIsOnlySubscriberTrue()
|
|
{
|
|
// Task 8 mode-gating: in single-subscriber mode a lone external subscriber that
|
|
// overflows reports isOnlySubscriber==true, so the legacy FailFast session-fault path
|
|
// is preserved. The decision is gated on the fixed session mode, NOT a live count, so
|
|
// it is race-free.
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
int observedSet = 0;
|
|
bool observedValue = false;
|
|
await using SessionEventDistributor distributor = new(
|
|
"session-single-sub",
|
|
ct => source.Reader.ReadAllAsync(ct),
|
|
subscriberQueueCapacity: 2,
|
|
replayBufferCapacity: 0,
|
|
replayRetentionSeconds: 0,
|
|
NullLogger<SessionEventDistributor>.Instance,
|
|
TimeProvider.System,
|
|
(isOnlySubscriber, _) =>
|
|
{
|
|
Volatile.Write(ref observedValue, isOnlySubscriber);
|
|
Volatile.Write(ref observedSet, 1);
|
|
},
|
|
singleSubscriberMode: true);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease external = distributor.Register();
|
|
|
|
for (ulong sequence = 1; sequence <= 10; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
}
|
|
|
|
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
|
async () => await DrainUntilFaultAsync(external.Reader));
|
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
|
|
|
await Task.Run(async () =>
|
|
{
|
|
using CancellationTokenSource cts = new(ReadTimeout);
|
|
while (Volatile.Read(ref observedSet) == 0)
|
|
{
|
|
await Task.Delay(10, cts.Token);
|
|
}
|
|
});
|
|
|
|
// Guard: ensure the handler actually fired before asserting its observed value.
|
|
// Without this the test could pass vacuously if the overflow never triggered.
|
|
Assert.Equal(1, Volatile.Read(ref observedSet));
|
|
Assert.True(Volatile.Read(ref observedValue),
|
|
"isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode.");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RegisterWithReplay_WithinRetainedWindow_ReturnsNewerEvents_NoGap_ThenLive()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 10,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
// A primer subscriber forces the pump to retain events 1..5 deterministically.
|
|
using IEventSubscriberLease primer = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(primer.Reader);
|
|
}
|
|
|
|
// Resume after sequence 2: retained window [1..5] still covers it — no gap, replay 3..5.
|
|
using IEventSubscriberLease resume = distributor.RegisterWithReplay(
|
|
2,
|
|
out IReadOnlyList<MxEvent> replay,
|
|
out bool gap,
|
|
out ulong oldestAvailable,
|
|
out ulong liveResume);
|
|
|
|
Assert.False(gap);
|
|
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
|
Assert.Equal(5ul, liveResume);
|
|
// OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true).
|
|
Assert.Equal(0ul, oldestAvailable);
|
|
|
|
// A subsequent live event flows to the resumed subscriber's channel.
|
|
source.Writer.TryWrite(Event(6));
|
|
MxEvent live = await ReadOneAsync(resume.Reader);
|
|
Assert.Equal(6ul, live.WorkerSequence);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RegisterWithReplay_BelowOldestRetained_ReportsGap_AndOldestAvailable()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 3,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease primer = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(primer.Reader);
|
|
}
|
|
|
|
// Capacity 3 retains 3,4,5; events 1,2 were evicted. Resume after 0 => gap, oldest=3.
|
|
using IEventSubscriberLease resume = distributor.RegisterWithReplay(
|
|
0,
|
|
out IReadOnlyList<MxEvent> replay,
|
|
out bool gap,
|
|
out ulong oldestAvailable,
|
|
out ulong liveResume);
|
|
|
|
Assert.True(gap);
|
|
Assert.Equal(3ul, oldestAvailable);
|
|
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
|
Assert.Equal(5ul, liveResume);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RegisterWithReplay_NothingRetainedNewer_LiveResumeEqualsAfterSequence_NoGap()
|
|
{
|
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
|
await using SessionEventDistributor distributor = CreateDistributor(
|
|
source.Reader,
|
|
replayBufferCapacity: 10,
|
|
replayRetentionSeconds: 0);
|
|
await distributor.StartAsync(CancellationToken.None);
|
|
|
|
using IEventSubscriberLease primer = distributor.Register();
|
|
for (ulong sequence = 1; sequence <= 3; sequence++)
|
|
{
|
|
source.Writer.TryWrite(Event(sequence));
|
|
_ = await ReadOneAsync(primer.Reader);
|
|
}
|
|
|
|
// Resume after 3 (newest retained): nothing newer, fully caught up — no gap, empty
|
|
// replay, and the live filter resumes after the requested watermark unchanged.
|
|
using IEventSubscriberLease resume = distributor.RegisterWithReplay(
|
|
3,
|
|
out IReadOnlyList<MxEvent> replay,
|
|
out bool gap,
|
|
out ulong oldestAvailable,
|
|
out ulong liveResume);
|
|
|
|
Assert.False(gap);
|
|
Assert.Empty(replay);
|
|
Assert.Equal(3ul, liveResume);
|
|
// OldestAvailableSequence is 0 when gap == false (meaningful only when gap is true).
|
|
Assert.Equal(0ul, oldestAvailable);
|
|
|
|
source.Writer.TryWrite(Event(4));
|
|
MxEvent live = await ReadOneAsync(resume.Reader);
|
|
Assert.Equal(4ul, live.WorkerSequence);
|
|
}
|
|
|
|
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
|
|
{
|
|
// Drains any buffered events, then surfaces the channel's completion fault (if any)
|
|
// by awaiting the final read past the buffered tail.
|
|
while (true)
|
|
{
|
|
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
|
|
while (reader.TryRead(out _))
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
|
|
=> CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);
|
|
|
|
private static SessionEventDistributor CreateDistributor(
|
|
ChannelReader<MxEvent> source,
|
|
int replayBufferCapacity,
|
|
double replayRetentionSeconds,
|
|
TimeProvider? timeProvider = null)
|
|
=> new(
|
|
"session-test",
|
|
ct => source.ReadAllAsync(ct),
|
|
subscriberQueueCapacity: 64,
|
|
replayBufferCapacity: replayBufferCapacity,
|
|
replayRetentionSeconds: replayRetentionSeconds,
|
|
NullLogger<SessionEventDistributor>.Instance,
|
|
timeProvider ?? TimeProvider.System);
|
|
|
|
private static MxEvent Event(ulong sequence)
|
|
=> new() { SessionId = "session-test", WorkerSequence = sequence };
|
|
|
|
private static async Task<MxEvent> ReadOneAsync(ChannelReader<MxEvent> reader)
|
|
{
|
|
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
|
|
Assert.True(reader.TryRead(out MxEvent? value));
|
|
return value!;
|
|
}
|
|
|
|
private static async Task AssertCompletedAsync(ChannelReader<MxEvent> reader)
|
|
{
|
|
// Drain anything still buffered, then assert the channel is completed
|
|
// (no further events). Bounded so a never-completing channel fails fast.
|
|
await reader.Completion.WaitAsync(ReadTimeout);
|
|
}
|
|
}
|