From 4f43733b968b6b0f52bbcbce8fb4d95b89c66d19 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 13:46:37 -0400 Subject: [PATCH] test(sessions): document overflow race safety + close backpressure coverage gaps - Issue 1: document the isOnlySubscriber snapshot race-safety assumption in OnSubscriberOverflow; flags the Task 7/8 revisit point explicitly. - Issue 2: pin StreamDisconnects==1 in the FailFast overflow test so a regression dropping the StreamDisconnected("Detached") finally call is caught. - Issue 3: replace plain int/bool? reads in SlowSubscriberOverflow test with Volatile.Read/Write + Interlocked.Increment stores to close the C# memory model data race on overflowCalls and observedIsOnlySubscriber. - Issue 4: add SlowSubscriberOverflow_WithMultipleSubscribers_... distributor test pinning that isOnlySubscriber==false disables the session-fault path; includes TODO(Task 8) note for the GatewaySession-level assertion. - Issue 5: reword SubscriberOverflowHandler XML doc to make explicit that the handler must NOT complete the subscriber's channel; the distributor owns that. --- .../Sessions/SessionEventDistributor.cs | 26 ++++-- .../Gateway/Grpc/EventStreamServiceTests.cs | 3 + .../Sessions/SessionEventDistributorTests.cs | 84 ++++++++++++++++++- 3 files changed, 102 insertions(+), 11 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 27dbb91..6758a4d 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -6,11 +6,11 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions; /// /// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full -/// and the event cannot be written. The handler applies the per-subscriber backpressure -/// policy: it records the overflow metric and, in the legacy single-subscriber FailFast -/// case, faults the owning session. It does NOT complete the subscriber's channel — the -/// distributor always disconnects the offending subscriber with an overflow fault — so -/// the handler is purely observability plus the session-fault decision. +/// and the event cannot be written. The handler applies policy side-effects only: +/// it records the overflow metric and, in the legacy single-subscriber FailFast case, +/// faults the owning session. The handler MUST NOT complete the subscriber's channel — +/// the distributor performs the disconnect and channel-completion unconditionally, +/// regardless of what the handler does. /// /// /// when the overflowing subscriber is the sole registered @@ -397,8 +397,20 @@ public sealed class SessionEventDistributor : IAsyncDisposable // slow consumer must not fault a session shared by other healthy subscribers. private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence) { - // Snapshot whether this is the sole subscriber BEFORE we unregister it. This is the - // legacy single-subscriber mode used by the single-subscriber FailFast back-compat path. + // Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives + // the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session + // when the overflowing subscriber is the sole subscriber. + // + // This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced + // by the validator and the single-subscriber guard in AttachEventSubscriber — a + // concurrent second registration is impossible, so the false-FailFast race (two + // subscribers, one overflows, Count reads as 1 after the other concurrently unregisters, + // FailFast wrongly faults the session) cannot occur today. + // + // REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the + // race window opens — a concurrent second registration could cause Count to read as 1 + // here even with two subscribers, producing a false FailFast that faults a shared + // session. Resolve before enabling multi-subscriber. bool isOnlySubscriber = _subscribers.Count == 1; _logger.LogDebug( diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs index 33a3d3b..8dd8c7e 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs @@ -206,6 +206,9 @@ public sealed class EventStreamServiceTests GatewayMetricsSnapshot snapshot = metrics.GetSnapshot(); Assert.Equal(1, snapshot.QueueOverflows); Assert.Equal(1, snapshot.Faults); + // The finally block in StreamEventsAsync calls StreamDisconnected("Detached") on the + // overflow+fault path too; pin it here so a regression removing that call is caught. + Assert.Equal(1, snapshot.StreamDisconnects); } /// diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 52fd7a3..835f19f 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -336,7 +336,12 @@ public sealed class SessionEventDistributorTests // while a second, healthy subscriber keeps receiving and the pump keeps pumping. Channel source = Channel.CreateUnbounded(); int overflowCalls = 0; - bool? observedIsOnlySubscriber = null; + // Separate fields for the bool value and the "set" flag so both can use + // Volatile.Read/Write; bool? is not valid for the volatile keyword on a local. + // Interlocked.Increment on the pump thread is the store for overflowCalls; + // Volatile.Read/Write provide ordering for observedIsOnlySubscriber. + int observedIsOnlySubscriberSet = 0; + bool observedIsOnlySubscriberValue = false; await using SessionEventDistributor distributor = new( "session-test", ct => source.Reader.ReadAllAsync(ct), @@ -348,7 +353,8 @@ public sealed class SessionEventDistributorTests isOnlySubscriber => { Interlocked.Increment(ref overflowCalls); - observedIsOnlySubscriber = isOnlySubscriber; + Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber); + Volatile.Write(ref observedIsOnlySubscriberSet, 1); }); await distributor.StartAsync(CancellationToken.None); @@ -372,8 +378,11 @@ public sealed class SessionEventDistributorTests Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); // Two subscribers were registered at overflow time, so isOnlySubscriber is false. - Assert.Equal(1, overflowCalls); - Assert.False(observedIsOnlySubscriber); + // Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the + // pump-thread writes, avoiding a data race by the C# memory model. + Assert.Equal(1, Volatile.Read(ref overflowCalls)); + Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet)); + Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue)); Assert.Equal(1, distributor.SubscriberCount); // The pump is still running and the healthy subscriber still receives new events. @@ -382,6 +391,73 @@ public sealed class SessionEventDistributorTests Assert.Equal(11ul, afterOverflow.WorkerSequence); } + [Fact] + public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving() + { + // Distributor-level pin for "FailFast with multiple subscribers degrades to + // disconnect-only (no session fault)": when the overflowing subscriber is NOT the + // sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT + // fault the session. This test drives the distributor directly (without GatewaySession) + // with two subscribers and a FailFast-style overflow handler seam, overflows the slow + // one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps + // receiving, and (c) the pump keeps running — all without a GatewaySession. + // + // TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once + // multi-subscriber config is enabled by the Tasks 7/8 validator/guard change. + Channel source = Channel.CreateUnbounded(); + bool handlerFiredWithFalse = false; + bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault + await using SessionEventDistributor distributor = new( + "session-multi-sub", + ct => source.Reader.ReadAllAsync(ct), + subscriberQueueCapacity: 2, + replayBufferCapacity: 0, + replayRetentionSeconds: 0, + NullLogger.Instance, + TimeProvider.System, + isOnlySubscriber => + { + if (!isOnlySubscriber) + { + // Multi-subscriber: FailFast degrades to disconnect-only. + Volatile.Write(ref handlerFiredWithFalse, true); + } + else + { + // Single-subscriber: FailFast would fault the session — must not happen here. + Volatile.Write(ref sessionFaultWouldBeCalled, true); + } + }); + await distributor.StartAsync(CancellationToken.None); + + // Slow subscriber: never reads, so capacity-2 channel overflows quickly. + using IEventSubscriberLease slow = distributor.Register(); + // Healthy subscriber: drains every event promptly. + using IEventSubscriberLease healthy = distributor.Register(); + + // Drive enough events to overflow the slow subscriber's channel. + for (ulong sequence = 1; sequence <= 10; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(healthy.Reader); + } + + // Slow subscriber is disconnected with the overflow fault. + SessionManagerException fault = await Assert.ThrowsAsync( + async () => await DrainUntilFaultAsync(slow.Reader)); + Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode); + + // The handler saw isOnlySubscriber==false (multi-subscriber degradation path). + Assert.True(Volatile.Read(ref handlerFiredWithFalse)); + // The FailFast session-fault branch was NOT taken (session stays Ready equivalent). + Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled)); + + // The pump and healthy subscriber are unaffected. + source.Writer.TryWrite(Event(11)); + MxEvent afterOverflow = await ReadOneAsync(healthy.Reader); + Assert.Equal(11ul, afterOverflow.WorkerSequence); + } + private static async Task DrainUntilFaultAsync(ChannelReader reader) { // Drains any buffered events, then surfaces the channel's completion fault (if any)