feat(sessions): per-subscriber backpressure isolation in SessionEventDistributor

This commit is contained in:
Joseph Doherty
2026-06-15 13:39:25 -04:00
parent 61627fc5b0
commit 039111ca05
9 changed files with 308 additions and 66 deletions
@@ -157,61 +157,96 @@ public sealed class EventStreamServiceTests
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0);
}
/// <summary>Verifies that event queue overflow faults the session and reports the overflow metric.</summary>
// TODO(Task 5): re-enable and re-target this to per-subscriber backpressure isolation.
[Fact(Skip = "Backpressure/overflow policy moved into SessionEventDistributor in Task 4; re-enabled and re-targeted to per-subscriber isolation in Task 5.")]
/// <summary>
/// Re-targeted in Task 5: a per-subscriber channel overflow in the session's
/// <see cref="SessionEventDistributor"/> faults the whole session under the legacy
/// single-subscriber FailFast policy (the default, single-subscriber mode) and records
/// the overflow + fault metrics. The distributor completes this subscriber's channel
/// with the overflow fault, which surfaces here as the same
/// <see cref="SessionManagerErrorCode.EventQueueOverflow"/> the pre-epic per-RPC
/// overflow produced.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
GatewaySession session = CreateReadySession(
workerClient,
queueCapacity: 1,
metrics: metrics,
backpressurePolicy: EventBackpressurePolicy.FailFast);
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
for (ulong sequence = 1; sequence <= 50; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
await WaitUntilAsync(() => session.State == SessionState.Faulted);
// The pump fans 50 events into a capacity-1 subscriber channel faster than this
// single reader drains, so one of the reads observes the terminal overflow fault.
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
async () =>
{
while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
{
}
});
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
await WaitUntilAsync(() => session.State == SessionState.Faulted);
Assert.Equal(SessionState.Faulted, session.State);
Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
Assert.Equal(1, metrics.GetSnapshot().Faults);
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(1, snapshot.QueueOverflows);
Assert.Equal(1, snapshot.Faults);
}
/// <summary>Verifies that the disconnect backpressure policy disconnects the subscriber without faulting the session.</summary>
// TODO(Task 5): re-enable and re-target this to per-subscriber backpressure isolation.
[Fact(Skip = "Backpressure/overflow policy moved into SessionEventDistributor in Task 4; re-enabled and re-targeted to per-subscriber isolation in Task 5.")]
/// <summary>
/// Re-targeted in Task 5: under the DisconnectSubscriber policy a per-subscriber
/// channel overflow disconnects only that subscriber's stream (terminal
/// <see cref="SessionManagerErrorCode.EventQueueOverflow"/>) and records the overflow
/// metric, but leaves the session <see cref="SessionState.Ready"/> and records no
/// fault. The session, pump, and any other subscribers are unaffected.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflowsWithDisconnectPolicy_LeavesSessionReady()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
GatewaySession session = CreateReadySession(
workerClient,
queueCapacity: 1,
metrics: metrics,
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1,
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
for (ulong sequence = 1; sequence <= 50; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
async () =>
{
while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
{
}
});
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
Assert.Equal(SessionState.Ready, session.State);
@@ -395,8 +430,14 @@ public sealed class EventStreamServiceTests
private static GatewaySession CreateReadySession(
FakeWorkerClient workerClient,
string sessionId = "session-events",
int queueCapacity = 8)
int queueCapacity = 8,
GatewayMetrics? metrics = null,
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
{
// The per-subscriber overflow policy now lives in the session's
// SessionEventDistributor, so the session must share the same metrics sink and
// backpressure policy the overflow assertions observe. queueCapacity flows into the
// distributor's per-subscriber channel bound, which is what overflows.
GatewaySession session = new(
sessionId,
GatewayContractInfo.DefaultBackendName,
@@ -413,9 +454,14 @@ public sealed class EventStreamServiceTests
DateTimeOffset.UtcNow,
new SessionEventStreaming(
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = queueCapacity },
new EventOptions
{
QueueCapacity = queueCapacity,
BackpressurePolicy = backpressurePolicy,
},
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System));
TimeProvider.System,
metrics ?? new GatewayMetrics()));
session.AttachWorkerClient(workerClient);
session.MarkReady();
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
@@ -260,7 +261,8 @@ public sealed class GatewaySessionTests
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = 8 },
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System));
TimeProvider.System,
new GatewayMetrics()));
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
@@ -328,6 +328,73 @@ public sealed class SessionEventDistributorTests
Assert.Empty(replay);
}
[Fact]
public async Task SlowSubscriberOverflow_DisconnectsOnlyThatSubscriber_PumpAndOtherKeepRunning()
{
// Per-subscriber backpressure isolation (Task 5): one subscriber stops reading and
// overflows its own tiny channel; it is disconnected with an EventQueueOverflow fault
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
int overflowCalls = 0;
bool? observedIsOnlySubscriber = null;
await using SessionEventDistributor distributor = new(
"session-test",
ct => source.Reader.ReadAllAsync(ct),
subscriberQueueCapacity: 2,
replayBufferCapacity: 1024,
replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
isOnlySubscriber =>
{
Interlocked.Increment(ref overflowCalls);
observedIsOnlySubscriber = isOnlySubscriber;
});
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: registered but never read, so its capacity-2 channel fills.
using IEventSubscriberLease slow = distributor.Register();
// Healthy subscriber: drains promptly throughout.
using IEventSubscriberLease healthy = distributor.Register();
// Push more events than the slow subscriber's channel can hold while the healthy one
// keeps up. The slow channel overflows; the healthy channel does not.
for (ulong sequence = 1; sequence <= 10; sequence++)
{
source.Writer.TryWrite(Event(sequence));
MxEvent received = await ReadOneAsync(healthy.Reader);
Assert.Equal(sequence, received.WorkerSequence);
}
// The slow subscriber is disconnected with the overflow fault.
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
async () => await DrainUntilFaultAsync(slow.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
Assert.Equal(1, overflowCalls);
Assert.False(observedIsOnlySubscriber);
Assert.Equal(1, distributor.SubscriberCount);
// The pump is still running and the healthy subscriber still receives new events.
source.Writer.TryWrite(Event(11));
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
Assert.Equal(11ul, afterOverflow.WorkerSequence);
}
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
{
// Drains any buffered events, then surfaces the channel's completion fault (if any)
// by awaiting the final read past the buffered tail.
while (true)
{
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
while (reader.TryRead(out _))
{
}
}
}
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
=> CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);