diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
index 27dbb91..6758a4d 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -6,11 +6,11 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
///
/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
-/// and the event cannot be written. The handler applies the per-subscriber backpressure
-/// policy: it records the overflow metric and, in the legacy single-subscriber FailFast
-/// case, faults the owning session. It does NOT complete the subscriber's channel — the
-/// distributor always disconnects the offending subscriber with an overflow fault — so
-/// the handler is purely observability plus the session-fault decision.
+/// and the event cannot be written. The handler applies policy side-effects only:
+/// it records the overflow metric and, in the legacy single-subscriber FailFast case,
+/// faults the owning session. The handler MUST NOT complete the subscriber's channel —
+/// the distributor performs the disconnect and channel-completion unconditionally,
+/// regardless of what the handler does.
///
///
/// when the overflowing subscriber is the sole registered
@@ -397,8 +397,20 @@ public sealed class SessionEventDistributor : IAsyncDisposable
// slow consumer must not fault a session shared by other healthy subscribers.
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
{
- // Snapshot whether this is the sole subscriber BEFORE we unregister it. This is the
- // legacy single-subscriber mode used by the single-subscriber FailFast back-compat path.
+ // Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives
+ // the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session
+ // when the overflowing subscriber is the sole subscriber.
+ //
+ // This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced
+ // by the validator and the single-subscriber guard in AttachEventSubscriber — a
+ // concurrent second registration is impossible, so the false-FailFast race (two
+ // subscribers, one overflows, Count reads as 1 after the other concurrently unregisters,
+ // FailFast wrongly faults the session) cannot occur today.
+ //
+ // REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the
+ // race window opens — a concurrent second registration could cause Count to read as 1
+ // here even with two subscribers, producing a false FailFast that faults a shared
+ // session. Resolve before enabling multi-subscriber.
bool isOnlySubscriber = _subscribers.Count == 1;
_logger.LogDebug(
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs
index 33a3d3b..8dd8c7e 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs
@@ -206,6 +206,9 @@ public sealed class EventStreamServiceTests
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(1, snapshot.QueueOverflows);
Assert.Equal(1, snapshot.Faults);
+ // The finally block in StreamEventsAsync calls StreamDisconnected("Detached") on the
+ // overflow+fault path too; pin it here so a regression removing that call is caught.
+ Assert.Equal(1, snapshot.StreamDisconnects);
}
///
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
index 52fd7a3..835f19f 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -336,7 +336,12 @@ public sealed class SessionEventDistributorTests
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
Channel source = Channel.CreateUnbounded();
int overflowCalls = 0;
- bool? observedIsOnlySubscriber = null;
+ // Separate fields for the bool value and the "set" flag so both can use
+ // Volatile.Read/Write; bool? is not valid for the volatile keyword on a local.
+ // Interlocked.Increment on the pump thread is the store for overflowCalls;
+ // Volatile.Read/Write provide ordering for observedIsOnlySubscriber.
+ int observedIsOnlySubscriberSet = 0;
+ bool observedIsOnlySubscriberValue = false;
await using SessionEventDistributor distributor = new(
"session-test",
ct => source.Reader.ReadAllAsync(ct),
@@ -348,7 +353,8 @@ public sealed class SessionEventDistributorTests
isOnlySubscriber =>
{
Interlocked.Increment(ref overflowCalls);
- observedIsOnlySubscriber = isOnlySubscriber;
+ Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
+ Volatile.Write(ref observedIsOnlySubscriberSet, 1);
});
await distributor.StartAsync(CancellationToken.None);
@@ -372,8 +378,11 @@ public sealed class SessionEventDistributorTests
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
- Assert.Equal(1, overflowCalls);
- Assert.False(observedIsOnlySubscriber);
+ // Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
+ // pump-thread writes, avoiding a data race by the C# memory model.
+ Assert.Equal(1, Volatile.Read(ref overflowCalls));
+ Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet));
+ Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue));
Assert.Equal(1, distributor.SubscriberCount);
// The pump is still running and the healthy subscriber still receives new events.
@@ -382,6 +391,73 @@ public sealed class SessionEventDistributorTests
Assert.Equal(11ul, afterOverflow.WorkerSequence);
}
+ [Fact]
+ public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
+ {
+ // Distributor-level pin for "FailFast with multiple subscribers degrades to
+ // disconnect-only (no session fault)": when the overflowing subscriber is NOT the
+ // sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT
+ // fault the session. This test drives the distributor directly (without GatewaySession)
+ // with two subscribers and a FailFast-style overflow handler seam, overflows the slow
+ // one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps
+ // receiving, and (c) the pump keeps running — all without a GatewaySession.
+ //
+ // TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once
+ // multi-subscriber config is enabled by the Tasks 7/8 validator/guard change.
+ Channel source = Channel.CreateUnbounded();
+ bool handlerFiredWithFalse = false;
+ bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
+ await using SessionEventDistributor distributor = new(
+ "session-multi-sub",
+ ct => source.Reader.ReadAllAsync(ct),
+ subscriberQueueCapacity: 2,
+ replayBufferCapacity: 0,
+ replayRetentionSeconds: 0,
+ NullLogger.Instance,
+ TimeProvider.System,
+ isOnlySubscriber =>
+ {
+ if (!isOnlySubscriber)
+ {
+ // Multi-subscriber: FailFast degrades to disconnect-only.
+ Volatile.Write(ref handlerFiredWithFalse, true);
+ }
+ else
+ {
+ // Single-subscriber: FailFast would fault the session — must not happen here.
+ Volatile.Write(ref sessionFaultWouldBeCalled, true);
+ }
+ });
+ await distributor.StartAsync(CancellationToken.None);
+
+ // Slow subscriber: never reads, so capacity-2 channel overflows quickly.
+ using IEventSubscriberLease slow = distributor.Register();
+ // Healthy subscriber: drains every event promptly.
+ using IEventSubscriberLease healthy = distributor.Register();
+
+ // Drive enough events to overflow the slow subscriber's channel.
+ for (ulong sequence = 1; sequence <= 10; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ _ = await ReadOneAsync(healthy.Reader);
+ }
+
+ // Slow subscriber is disconnected with the overflow fault.
+ SessionManagerException fault = await Assert.ThrowsAsync(
+ async () => await DrainUntilFaultAsync(slow.Reader));
+ Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
+
+ // The handler saw isOnlySubscriber==false (multi-subscriber degradation path).
+ Assert.True(Volatile.Read(ref handlerFiredWithFalse));
+ // The FailFast session-fault branch was NOT taken (session stays Ready equivalent).
+ Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled));
+
+ // The pump and healthy subscriber are unaffected.
+ source.Writer.TryWrite(Event(11));
+ MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
+ Assert.Equal(11ul, afterOverflow.WorkerSequence);
+ }
+
private static async Task DrainUntilFaultAsync(ChannelReader reader)
{
// Drains any buffered events, then surfaces the channel's completion fault (if any)