diff --git a/docs/GatewayConfiguration.md b/docs/GatewayConfiguration.md
index 90db1b5..db57112 100644
--- a/docs/GatewayConfiguration.md
+++ b/docs/GatewayConfiguration.md
@@ -136,14 +136,20 @@ ordering and avoids competing consumers.
| Option | Default | Description |
|--------|---------|-------------|
| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. |
-| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. |
+| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Per-subscriber event backpressure behavior when a subscriber's bounded event channel overflows. Overflow is isolated to the offending subscriber: it is always disconnected with an `EventQueueOverflow` fault while the session pump and other subscribers keep running. `FailFast` additionally faults the whole session only in the legacy single-subscriber case (the current default mode); with multiple subscribers it degrades to a per-subscriber disconnect so one slow consumer never faults a shared session. `DisconnectSubscriber` disconnects only the slow subscriber in all cases. |
| `MxGateway:Events:ReplayBufferCapacity` | `1024` | Maximum number of events retained per session in the replay ring buffer, used to re-deliver events a returning subscriber missed (reconnect/reattach). The oldest retained event is evicted once this count is exceeded. `0` disables replay retention. |
| `MxGateway:Events:ReplayRetentionSeconds` | `300` | Maximum age, in seconds, of an event retained in the replay ring buffer. Entries older than this are evicted regardless of capacity. `0` disables age-based eviction. |
-`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow
-faults the affected worker or session instead of silently dropping MXAccess
-events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only
-the affected stream while the MXAccess session remains active.
+`QueueCapacity` must be greater than zero; it bounds each per-subscriber event
+channel fed by the session's single event pump. A slow subscriber overflows only
+its own channel and is always disconnected with an `EventQueueOverflow` fault
+rather than silently dropping MXAccess events — the pump, the session, and other
+subscribers are unaffected. With `FailFast` in the single-subscriber case (the
+default mode), that overflow additionally faults the whole session; with multiple
+subscribers `FailFast` degrades to a per-subscriber disconnect, matching
+`DisconnectSubscriber`, so one slow consumer cannot fault a session shared by
+healthy subscribers. With `DisconnectSubscriber`, overflow terminates only the
+affected stream while the MXAccess session remains active.
`ReplayBufferCapacity` and `ReplayRetentionSeconds` must each be greater than or
equal to zero (either dimension can be disabled with `0`). A returning subscriber
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs
index 75f50a2..85581f2 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs
@@ -30,13 +30,17 @@ public sealed class EventStreamService(
/// queue-depth metrics, and the backpressure/overflow policy.
///
///
- /// Overflow detection: the distributor's per-subscriber channel is bounded and the
- /// pump drops (does not block) on a full channel. Worker sequences are contiguous
- /// and the pump preserves order, so a gap between consecutive delivered
- /// values means the pump dropped events for
- /// this slow subscriber — that is the overflow signal that, before Task 4, was a
- /// full per-RPC channel. The FailFast / DisconnectSubscriber semantics are
- /// unchanged. Task 5 takes over the per-subscriber isolation policy.
+ /// Overflow handling (Task 5): the distributor's per-subscriber channel is bounded
+ /// and the pump writes non-blocking. When this subscriber's channel is full the pump
+ /// applies the per-subscriber backpressure policy and completes this subscriber's
+ /// channel with a
+ /// (). That terminal fault
+ /// surfaces here when the reader's MoveNextAsync throws, and — like the
+ /// pre-epic per-RPC overflow — it propagates to the gRPC client unchanged. The
+ /// overflow metric, and (in the legacy single-subscriber FailFast case) the session
+ /// fault + fault metric, are recorded by the distributor's overflow handler so the
+ /// session, the pump, and other subscribers are isolated from this subscriber's
+ /// slowness.
///
///
/// Stream events request.
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
index 353b839..414d806 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs
@@ -2,6 +2,7 @@ using System.Runtime.CompilerServices;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
+using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
@@ -382,7 +383,8 @@ public sealed class GatewaySession
eventOptions.ReplayBufferCapacity,
eventOptions.ReplayRetentionSeconds,
_eventStreaming.DistributorLogger,
- _eventStreaming.TimeProvider);
+ _eventStreaming.TimeProvider,
+ CreateOverflowHandler(eventOptions.BackpressurePolicy));
}
distributor = _eventDistributor;
@@ -409,6 +411,34 @@ public sealed class GatewaySession
return lease;
}
+ // Builds the per-subscriber backpressure handler the distributor invokes when a
+ // subscriber's bounded channel overflows. The distributor always disconnects the
+ // offending subscriber with an EventQueueOverflow fault; this handler adds the
+ // observable side effects, preserving exactly what the pre-epic per-RPC overflow path
+ // emitted:
+ // - always record the queue-overflow metric;
+ // - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole
+ // session and record the fault metric, matching back-compat behavior;
+ // - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT
+ // fault the session — the distributor's disconnect of the one slow subscriber is the
+ // whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber
+ // FailFast deliberately degrades to a disconnect because faulting a shared session on
+ // one slow consumer would punish healthy subscribers.
+ private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy)
+ {
+ GatewayMetrics metrics = _eventStreaming.Metrics;
+ return isOnlySubscriber =>
+ {
+ metrics.QueueOverflow("grpc-event-stream");
+
+ if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber)
+ {
+ MarkFaulted($"Session {SessionId} event stream queue overflowed.");
+ metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
+ }
+ };
+ }
+
// The distributor's single event source. Drains the worker event stream once (the
// distributor guarantees a single consumer) and maps each frame to the public MxEvent,
// preserving worker order. Mirrors the former ProduceEventsAsync mapping exactly.
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
index e3746e6..27dbb91 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -4,6 +4,22 @@ using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
+///
+/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
+/// and the event cannot be written. The handler applies the per-subscriber backpressure
+/// policy: it records the overflow metric and, in the legacy single-subscriber FailFast
+/// case, faults the owning session. It does NOT complete the subscriber's channel — the
+/// distributor always disconnects the offending subscriber with an overflow fault — so
+/// the handler is purely observability plus the session-fault decision.
+///
+///
+/// when the overflowing subscriber is the sole registered
+/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
+/// the session only in this case; with multiple subscribers FailFast degrades to a
+/// per-subscriber disconnect so one slow consumer never faults a session shared by others.
+///
+public delegate void SubscriberOverflowHandler(bool isOnlySubscriber);
+
///
/// Per-session event pump and fan-out. A single background task drains the
/// session's event source exactly once and fans each event out to
@@ -12,10 +28,13 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
///
///
/// Introduced by Task 2 of the Session Resilience epic; the bounded replay ring
-/// buffer was added by Task 3. The class is NOT yet wired into
-/// GatewaySession or EventStreamService (Task 4), has no
-/// per-subscriber backpressure-isolation policy (Task 5), and does not remove
-/// the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity
+/// buffer was added by Task 3, it was wired into GatewaySession and
+/// EventStreamService by Task 4, and the per-subscriber backpressure-isolation
+/// policy (Task 5) is implemented here: a slow subscriber overflows only its own
+/// bounded channel and the pump applies the policy to that subscriber alone (see
+/// and OnSubscriberOverflow), leaving
+/// the pump, the session, and other subscribers running. The class does not yet
+/// remove the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity
/// eviction (oldest entry dropped when the count exceeds
/// replayBufferCapacity) and age eviction (entries older than
/// replayRetentionSeconds dropped on the next append or query), and is
@@ -57,6 +76,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
private readonly string _sessionId;
private readonly Func> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
+ private readonly SubscriberOverflowHandler? _overflowHandler;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
@@ -106,7 +126,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
string sessionId,
Func> eventSourceFactory,
int subscriberQueueCapacity,
- ILogger logger)
+ ILogger logger,
+ SubscriberOverflowHandler? overflowHandler = null)
: this(
sessionId,
eventSourceFactory,
@@ -114,7 +135,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
logger,
- TimeProvider.System)
+ TimeProvider.System,
+ overflowHandler)
{
}
@@ -144,6 +166,14 @@ public sealed class SessionEventDistributor : IAsyncDisposable
/// Clock used to timestamp and age-evict replay entries. Inject a fake to make
/// age-eviction deterministic in tests.
///
+ ///
+ /// Optional per-subscriber backpressure handler invoked when a subscriber's bounded
+ /// channel is full. It records the overflow metric and, for the legacy
+ /// single-subscriber FailFast case, faults the owning session. The distributor always
+ /// disconnects the offending subscriber with an overflow fault regardless of the
+ /// handler. When (unit/skeleton use) the offending subscriber is
+ /// still disconnected but no metric/fault side effect runs.
+ ///
public SessionEventDistributor(
string sessionId,
Func> eventSourceFactory,
@@ -151,7 +181,8 @@ public sealed class SessionEventDistributor : IAsyncDisposable
int replayBufferCapacity,
double replayRetentionSeconds,
ILogger logger,
- TimeProvider timeProvider)
+ TimeProvider timeProvider,
+ SubscriberOverflowHandler? overflowHandler = null)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
@@ -164,6 +195,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
+ _overflowHandler = overflowHandler;
_shutdownTimeout = DefaultShutdownTimeout;
_replayBufferCapacity = replayBufferCapacity;
_ageEvictionEnabled = replayRetentionSeconds > 0;
@@ -214,11 +246,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
//
- // FullMode is Wait but the pump currently writes with TryWrite (drop-on-full):
- // these are deliberately opposite policies and only a placeholder. Task 5 owns
- // the overflow policy and will reconcile them by either switching the pump to
- // WriteAsync (true backpressure, honouring Wait) or changing this to a Drop mode.
- // Do not "fix" the mismatch here — leave the decision to Task 5.
+ // The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one
+ // slow reader can never stall the single pump that feeds every subscriber. FullMode
+ // is deliberately Wait — NOT because the pump ever blocks (it never calls the blocking
+ // WriteAsync overload), but because Wait is the only BoundedChannelFullMode under
+ // which TryWrite returns false when the channel is full. That false return IS the
+ // overflow signal the pump needs to apply the per-subscriber backpressure policy. The
+ // Drop* modes would make TryWrite silently succeed-and-drop, hiding overflow and
+ // re-introducing the silent data loss this task removes. So: Wait mode + TryWrite =
+ // a non-blocking pump that still detects a full subscriber channel.
Channel channel = Channel.CreateBounded(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
@@ -321,20 +357,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// which matches "late subscribers see events after they register".
foreach (Subscriber subscriber in _subscribers.Values)
{
- // TODO(Task 5): define overflow policy (per-subscriber isolation —
- // drop / disconnect / fault that one subscriber). For the Task 2
- // skeleton, a non-blocking TryWrite that silently drops on a full
- // channel is the placeholder so one slow reader never stalls the pump.
+ // Non-blocking write: TryWrite never blocks the pump on a slow reader.
+ // A false return means this subscriber's bounded channel is full — the
+ // per-subscriber overflow signal. We apply the backpressure policy to
+ // THIS subscriber only; the pump, the session, and every other subscriber
+ // keep running. Logs identifiers (worker sequence, subscriber id, session)
+ // only, never the event payload or tag values.
if (!subscriber.Channel.Writer.TryWrite(mxEvent))
{
- // Visibility only — Task 5 owns the actual drop/backpressure policy.
- // Logs identifiers (worker sequence, subscriber id, session) only,
- // never the event payload or tag values.
- _logger.LogDebug(
- "Event distributor dropped event (worker sequence {WorkerSequence}) for subscriber {SubscriberId} in session {SessionId}: channel full.",
- mxEvent.WorkerSequence,
- subscriber.Id,
- _sessionId);
+ OnSubscriberOverflow(subscriber, mxEvent.WorkerSequence);
}
}
}
@@ -357,6 +388,52 @@ public sealed class SessionEventDistributor : IAsyncDisposable
}
}
+ // Applies the per-subscriber backpressure policy when a subscriber's bounded channel is
+ // full. Runs on the pump thread. The offending subscriber is ALWAYS disconnected with an
+ // overflow fault and unregistered, so it can never wedge the pump again; the overflow
+ // handler decides the observable side effects (overflow metric, and — for legacy
+ // single-subscriber FailFast — faulting the owning session). Multi-subscriber FailFast
+ // intentionally degrades to a plain disconnect (see SubscriberOverflowHandler docs): one
+ // slow consumer must not fault a session shared by other healthy subscribers.
+ private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
+ {
+ // Snapshot whether this is the sole subscriber BEFORE we unregister it. This is the
+ // legacy single-subscriber mode used by the single-subscriber FailFast back-compat path.
+ bool isOnlySubscriber = _subscribers.Count == 1;
+
+ _logger.LogDebug(
+ "Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).",
+ subscriber.Id,
+ _sessionId,
+ workerSequence);
+
+ // Observability + session-fault decision. Errors here must not stall the pump or
+ // leave the subscriber attached, so the disconnect below runs regardless.
+ try
+ {
+ _overflowHandler?.Invoke(isOnlySubscriber);
+ }
+ catch (Exception exception)
+ {
+ _logger.LogError(
+ exception,
+ "Event distributor overflow handler threw for session {SessionId}; disconnecting subscriber {SubscriberId} anyway.",
+ _sessionId,
+ subscriber.Id);
+ }
+
+ // Disconnect ONLY this subscriber: complete its channel with the overflow fault and
+ // remove it from the fan-out set. Its gRPC reader's MoveNextAsync then throws the
+ // SessionManagerException, which EventStreamService surfaces to the client exactly as
+ // the pre-epic per-RPC overflow did. The pump and every other subscriber are untouched.
+ if (_subscribers.TryRemove(subscriber.Id, out _))
+ {
+ subscriber.Channel.Writer.TryComplete(new SessionManagerException(
+ SessionManagerErrorCode.EventQueueOverflow,
+ $"Session {_sessionId} event stream queue overflowed."));
+ }
+ }
+
private void CompleteAllSubscribers(Exception? error)
{
foreach (Subscriber subscriber in _subscribers.Values)
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs
index 5ced3ee..9b24a68 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
+using ZB.MOM.WW.MxGateway.Server.Metrics;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
@@ -23,20 +24,28 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
///
/// Logger for the distributor pump lifecycle.
/// Clock used to timestamp and age-evict replay entries.
+///
+/// Gateway metrics sink used by the session's per-subscriber overflow handler to record
+/// the queue-overflow counter and, for legacy single-subscriber FailFast, the session
+/// fault. Carrying it here keeps the distributor decoupled from the metrics type while
+/// preserving the observability the pre-epic per-RPC overflow path emitted.
+///
public sealed record SessionEventStreaming(
MxAccessGrpcMapper Mapper,
EventOptions EventOptions,
ILogger DistributorLogger,
- TimeProvider TimeProvider)
+ TimeProvider TimeProvider,
+ GatewayMetrics Metrics)
{
///
/// Defaults used when a session is constructed without explicit streaming
/// dependencies (unit tests). Uses a fresh mapper, default event options, a no-op
- /// logger, and the system clock.
+ /// logger, the system clock, and a fresh metrics sink.
///
public static SessionEventStreaming Default { get; } = new(
new MxAccessGrpcMapper(),
new EventOptions(),
NullLogger.Instance,
- TimeProvider.System);
+ TimeProvider.System,
+ new GatewayMetrics());
}
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
index 172c05c..507b42a 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs
@@ -450,7 +450,8 @@ public sealed class SessionManager : ISessionManager
_eventMapper,
_options.Events,
_distributorLogger,
- _timeProvider);
+ _timeProvider,
+ _metrics);
return new GatewaySession(
sessionId,
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs
index cedbcb6..33a3d3b 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs
@@ -157,61 +157,96 @@ public sealed class EventStreamServiceTests
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0);
}
- /// Verifies that event queue overflow faults the session and reports the overflow metric.
- // TODO(Task 5): re-enable and re-target this to per-subscriber backpressure isolation.
- [Fact(Skip = "Backpressure/overflow policy moved into SessionEventDistributor in Task 4; re-enabled and re-targeted to per-subscriber isolation in Task 5.")]
+ ///
+ /// Re-targeted in Task 5: a per-subscriber channel overflow in the session's
+ /// faults the whole session under the legacy
+ /// single-subscriber FailFast policy (the default, single-subscriber mode) and records
+ /// the overflow + fault metrics. The distributor completes this subscriber's channel
+ /// with the overflow fault, which surfaces here as the same
+ /// the pre-epic per-RPC
+ /// overflow produced.
+ ///
+ [Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
{
FakeWorkerClient workerClient = new();
- GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
+ GatewaySession session = CreateReadySession(
+ workerClient,
+ queueCapacity: 1,
+ metrics: metrics,
+ backpressurePolicy: EventBackpressurePolicy.FailFast);
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1);
- workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
- workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
- workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
+ for (ulong sequence = 1; sequence <= 50; sequence++)
+ {
+ workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
+ }
+
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
- Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
- await WaitUntilAsync(() => session.State == SessionState.Faulted);
+ // The pump fans 50 events into a capacity-1 subscriber channel faster than this
+ // single reader drains, so one of the reads observes the terminal overflow fault.
SessionManagerException exception = await Assert.ThrowsAsync(
- async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
+ async () =>
+ {
+ while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
+ {
+ }
+ });
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
+ await WaitUntilAsync(() => session.State == SessionState.Faulted);
Assert.Equal(SessionState.Faulted, session.State);
- Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
- Assert.Equal(1, metrics.GetSnapshot().Faults);
+ GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
+ Assert.Equal(1, snapshot.QueueOverflows);
+ Assert.Equal(1, snapshot.Faults);
}
- /// Verifies that the disconnect backpressure policy disconnects the subscriber without faulting the session.
- // TODO(Task 5): re-enable and re-target this to per-subscriber backpressure isolation.
- [Fact(Skip = "Backpressure/overflow policy moved into SessionEventDistributor in Task 4; re-enabled and re-targeted to per-subscriber isolation in Task 5.")]
+ ///
+ /// Re-targeted in Task 5: under the DisconnectSubscriber policy a per-subscriber
+ /// channel overflow disconnects only that subscriber's stream (terminal
+ /// ) and records the overflow
+ /// metric, but leaves the session and records no
+ /// fault. The session, pump, and any other subscribers are unaffected.
+ ///
+ [Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflowsWithDisconnectPolicy_LeavesSessionReady()
{
FakeWorkerClient workerClient = new();
- GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
+ GatewaySession session = CreateReadySession(
+ workerClient,
+ queueCapacity: 1,
+ metrics: metrics,
+ backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1,
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
- workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
- workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
- workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
+ for (ulong sequence = 1; sequence <= 50; sequence++)
+ {
+ workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
+ }
+
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
- Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
SessionManagerException exception = await Assert.ThrowsAsync(
- async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
+ async () =>
+ {
+ while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
+ {
+ }
+ });
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
Assert.Equal(SessionState.Ready, session.State);
@@ -395,8 +430,14 @@ public sealed class EventStreamServiceTests
private static GatewaySession CreateReadySession(
FakeWorkerClient workerClient,
string sessionId = "session-events",
- int queueCapacity = 8)
+ int queueCapacity = 8,
+ GatewayMetrics? metrics = null,
+ EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
{
+ // The per-subscriber overflow policy now lives in the session's
+ // SessionEventDistributor, so the session must share the same metrics sink and
+ // backpressure policy the overflow assertions observe. queueCapacity flows into the
+ // distributor's per-subscriber channel bound, which is what overflows.
GatewaySession session = new(
sessionId,
GatewayContractInfo.DefaultBackendName,
@@ -413,9 +454,14 @@ public sealed class EventStreamServiceTests
DateTimeOffset.UtcNow,
new SessionEventStreaming(
new MxAccessGrpcMapper(),
- new EventOptions { QueueCapacity = queueCapacity },
+ new EventOptions
+ {
+ QueueCapacity = queueCapacity,
+ BackpressurePolicy = backpressurePolicy,
+ },
NullLogger.Instance,
- TimeProvider.System));
+ TimeProvider.System,
+ metrics ?? new GatewayMetrics()));
session.AttachWorkerClient(workerClient);
session.MarkReady();
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs
index ae03b4e..45363ae 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
+using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
@@ -260,7 +261,8 @@ public sealed class GatewaySessionTests
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = 8 },
NullLogger.Instance,
- TimeProvider.System));
+ TimeProvider.System,
+ new GatewayMetrics()));
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
index f6cb53a..52fd7a3 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -328,6 +328,73 @@ public sealed class SessionEventDistributorTests
Assert.Empty(replay);
}
+ [Fact]
+ public async Task SlowSubscriberOverflow_DisconnectsOnlyThatSubscriber_PumpAndOtherKeepRunning()
+ {
+ // Per-subscriber backpressure isolation (Task 5): one subscriber stops reading and
+ // overflows its own tiny channel; it is disconnected with an EventQueueOverflow fault
+ // while a second, healthy subscriber keeps receiving and the pump keeps pumping.
+ Channel source = Channel.CreateUnbounded();
+ int overflowCalls = 0;
+ bool? observedIsOnlySubscriber = null;
+ await using SessionEventDistributor distributor = new(
+ "session-test",
+ ct => source.Reader.ReadAllAsync(ct),
+ subscriberQueueCapacity: 2,
+ replayBufferCapacity: 1024,
+ replayRetentionSeconds: 0,
+ NullLogger.Instance,
+ TimeProvider.System,
+ isOnlySubscriber =>
+ {
+ Interlocked.Increment(ref overflowCalls);
+ observedIsOnlySubscriber = isOnlySubscriber;
+ });
+ await distributor.StartAsync(CancellationToken.None);
+
+ // Slow subscriber: registered but never read, so its capacity-2 channel fills.
+ using IEventSubscriberLease slow = distributor.Register();
+ // Healthy subscriber: drains promptly throughout.
+ using IEventSubscriberLease healthy = distributor.Register();
+
+ // Push more events than the slow subscriber's channel can hold while the healthy one
+ // keeps up. The slow channel overflows; the healthy channel does not.
+ for (ulong sequence = 1; sequence <= 10; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ MxEvent received = await ReadOneAsync(healthy.Reader);
+ Assert.Equal(sequence, received.WorkerSequence);
+ }
+
+ // The slow subscriber is disconnected with the overflow fault.
+ SessionManagerException fault = await Assert.ThrowsAsync(
+ async () => await DrainUntilFaultAsync(slow.Reader));
+ Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
+
+ // Two subscribers were registered at overflow time, so isOnlySubscriber is false.
+ Assert.Equal(1, overflowCalls);
+ Assert.False(observedIsOnlySubscriber);
+ Assert.Equal(1, distributor.SubscriberCount);
+
+ // The pump is still running and the healthy subscriber still receives new events.
+ source.Writer.TryWrite(Event(11));
+ MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
+ Assert.Equal(11ul, afterOverflow.WorkerSequence);
+ }
+
+ private static async Task DrainUntilFaultAsync(ChannelReader reader)
+ {
+ // Drains any buffered events, then surfaces the channel's completion fault (if any)
+ // by awaiting the final read past the buffered tail.
+ while (true)
+ {
+ await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
+ while (reader.TryRead(out _))
+ {
+ }
+ }
+ }
+
private static SessionEventDistributor CreateDistributor(ChannelReader source)
=> CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);