test(sessions): document overflow race safety + close backpressure coverage gaps
- Issue 1: document the isOnlySubscriber snapshot race-safety assumption in
OnSubscriberOverflow; flags the Task 7/8 revisit point explicitly.
- Issue 2: pin StreamDisconnects==1 in the FailFast overflow test so a
regression dropping the StreamDisconnected("Detached") finally call is caught.
- Issue 3: replace plain int/bool? reads in SlowSubscriberOverflow test with
Volatile.Read/Write + Interlocked.Increment stores to close the C# memory
model data race on overflowCalls and observedIsOnlySubscriber.
- Issue 4: add SlowSubscriberOverflow_WithMultipleSubscribers_... distributor
test pinning that isOnlySubscriber==false disables the session-fault path;
includes TODO(Task 8) note for the GatewaySession-level assertion.
- Issue 5: reword SubscriberOverflowHandler XML doc to make explicit that the
handler must NOT complete the subscriber's channel; the distributor owns that.
This commit is contained in:
@@ -6,11 +6,11 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions;
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
|
/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
|
||||||
/// and the event cannot be written. The handler applies the per-subscriber backpressure
|
/// and the event cannot be written. The handler applies policy side-effects only:
|
||||||
/// policy: it records the overflow metric and, in the legacy single-subscriber FailFast
|
/// it records the overflow metric and, in the legacy single-subscriber FailFast case,
|
||||||
/// case, faults the owning session. It does NOT complete the subscriber's channel — the
|
/// faults the owning session. The handler MUST NOT complete the subscriber's channel —
|
||||||
/// distributor always disconnects the offending subscriber with an overflow fault — so
|
/// the distributor performs the disconnect and channel-completion unconditionally,
|
||||||
/// the handler is purely observability plus the session-fault decision.
|
/// regardless of what the handler does.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="isOnlySubscriber">
|
/// <param name="isOnlySubscriber">
|
||||||
/// <see langword="true"/> when the overflowing subscriber is the sole registered
|
/// <see langword="true"/> when the overflowing subscriber is the sole registered
|
||||||
@@ -397,8 +397,20 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
|||||||
// slow consumer must not fault a session shared by other healthy subscribers.
|
// slow consumer must not fault a session shared by other healthy subscribers.
|
||||||
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
|
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
|
||||||
{
|
{
|
||||||
// Snapshot whether this is the sole subscriber BEFORE we unregister it. This is the
|
// Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives
|
||||||
// legacy single-subscriber mode used by the single-subscriber FailFast back-compat path.
|
// the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session
|
||||||
|
// when the overflowing subscriber is the sole subscriber.
|
||||||
|
//
|
||||||
|
// This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced
|
||||||
|
// by the validator and the single-subscriber guard in AttachEventSubscriber — a
|
||||||
|
// concurrent second registration is impossible, so the false-FailFast race (two
|
||||||
|
// subscribers, one overflows, Count reads as 1 after the other concurrently unregisters,
|
||||||
|
// FailFast wrongly faults the session) cannot occur today.
|
||||||
|
//
|
||||||
|
// REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the
|
||||||
|
// race window opens — a concurrent second registration could cause Count to read as 1
|
||||||
|
// here even with two subscribers, producing a false FailFast that faults a shared
|
||||||
|
// session. Resolve before enabling multi-subscriber.
|
||||||
bool isOnlySubscriber = _subscribers.Count == 1;
|
bool isOnlySubscriber = _subscribers.Count == 1;
|
||||||
|
|
||||||
_logger.LogDebug(
|
_logger.LogDebug(
|
||||||
|
|||||||
@@ -206,6 +206,9 @@ public sealed class EventStreamServiceTests
|
|||||||
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
|
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
|
||||||
Assert.Equal(1, snapshot.QueueOverflows);
|
Assert.Equal(1, snapshot.QueueOverflows);
|
||||||
Assert.Equal(1, snapshot.Faults);
|
Assert.Equal(1, snapshot.Faults);
|
||||||
|
// The finally block in StreamEventsAsync calls StreamDisconnected("Detached") on the
|
||||||
|
// overflow+fault path too; pin it here so a regression removing that call is caught.
|
||||||
|
Assert.Equal(1, snapshot.StreamDisconnects);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -336,7 +336,12 @@ public sealed class SessionEventDistributorTests
|
|||||||
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
|
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
|
||||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||||
int overflowCalls = 0;
|
int overflowCalls = 0;
|
||||||
bool? observedIsOnlySubscriber = null;
|
// Separate fields for the bool value and the "set" flag so both can use
|
||||||
|
// Volatile.Read/Write; bool? is not valid for the volatile keyword on a local.
|
||||||
|
// Interlocked.Increment on the pump thread is the store for overflowCalls;
|
||||||
|
// Volatile.Read/Write provide ordering for observedIsOnlySubscriber.
|
||||||
|
int observedIsOnlySubscriberSet = 0;
|
||||||
|
bool observedIsOnlySubscriberValue = false;
|
||||||
await using SessionEventDistributor distributor = new(
|
await using SessionEventDistributor distributor = new(
|
||||||
"session-test",
|
"session-test",
|
||||||
ct => source.Reader.ReadAllAsync(ct),
|
ct => source.Reader.ReadAllAsync(ct),
|
||||||
@@ -348,7 +353,8 @@ public sealed class SessionEventDistributorTests
|
|||||||
isOnlySubscriber =>
|
isOnlySubscriber =>
|
||||||
{
|
{
|
||||||
Interlocked.Increment(ref overflowCalls);
|
Interlocked.Increment(ref overflowCalls);
|
||||||
observedIsOnlySubscriber = isOnlySubscriber;
|
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
|
||||||
|
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
|
||||||
});
|
});
|
||||||
await distributor.StartAsync(CancellationToken.None);
|
await distributor.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
@@ -372,8 +378,11 @@ public sealed class SessionEventDistributorTests
|
|||||||
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
||||||
|
|
||||||
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
|
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
|
||||||
Assert.Equal(1, overflowCalls);
|
// Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
|
||||||
Assert.False(observedIsOnlySubscriber);
|
// pump-thread writes, avoiding a data race by the C# memory model.
|
||||||
|
Assert.Equal(1, Volatile.Read(ref overflowCalls));
|
||||||
|
Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet));
|
||||||
|
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue));
|
||||||
Assert.Equal(1, distributor.SubscriberCount);
|
Assert.Equal(1, distributor.SubscriberCount);
|
||||||
|
|
||||||
// The pump is still running and the healthy subscriber still receives new events.
|
// The pump is still running and the healthy subscriber still receives new events.
|
||||||
@@ -382,6 +391,73 @@ public sealed class SessionEventDistributorTests
|
|||||||
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
|
||||||
|
{
|
||||||
|
// Distributor-level pin for "FailFast with multiple subscribers degrades to
|
||||||
|
// disconnect-only (no session fault)": when the overflowing subscriber is NOT the
|
||||||
|
// sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT
|
||||||
|
// fault the session. This test drives the distributor directly (without GatewaySession)
|
||||||
|
// with two subscribers and a FailFast-style overflow handler seam, overflows the slow
|
||||||
|
// one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps
|
||||||
|
// receiving, and (c) the pump keeps running — all without a GatewaySession.
|
||||||
|
//
|
||||||
|
// TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once
|
||||||
|
// multi-subscriber config is enabled by the Tasks 7/8 validator/guard change.
|
||||||
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||||
|
bool handlerFiredWithFalse = false;
|
||||||
|
bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
|
||||||
|
await using SessionEventDistributor distributor = new(
|
||||||
|
"session-multi-sub",
|
||||||
|
ct => source.Reader.ReadAllAsync(ct),
|
||||||
|
subscriberQueueCapacity: 2,
|
||||||
|
replayBufferCapacity: 0,
|
||||||
|
replayRetentionSeconds: 0,
|
||||||
|
NullLogger<SessionEventDistributor>.Instance,
|
||||||
|
TimeProvider.System,
|
||||||
|
isOnlySubscriber =>
|
||||||
|
{
|
||||||
|
if (!isOnlySubscriber)
|
||||||
|
{
|
||||||
|
// Multi-subscriber: FailFast degrades to disconnect-only.
|
||||||
|
Volatile.Write(ref handlerFiredWithFalse, true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Single-subscriber: FailFast would fault the session — must not happen here.
|
||||||
|
Volatile.Write(ref sessionFaultWouldBeCalled, true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await distributor.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
// Slow subscriber: never reads, so capacity-2 channel overflows quickly.
|
||||||
|
using IEventSubscriberLease slow = distributor.Register();
|
||||||
|
// Healthy subscriber: drains every event promptly.
|
||||||
|
using IEventSubscriberLease healthy = distributor.Register();
|
||||||
|
|
||||||
|
// Drive enough events to overflow the slow subscriber's channel.
|
||||||
|
for (ulong sequence = 1; sequence <= 10; sequence++)
|
||||||
|
{
|
||||||
|
source.Writer.TryWrite(Event(sequence));
|
||||||
|
_ = await ReadOneAsync(healthy.Reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow subscriber is disconnected with the overflow fault.
|
||||||
|
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
||||||
|
async () => await DrainUntilFaultAsync(slow.Reader));
|
||||||
|
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
||||||
|
|
||||||
|
// The handler saw isOnlySubscriber==false (multi-subscriber degradation path).
|
||||||
|
Assert.True(Volatile.Read(ref handlerFiredWithFalse));
|
||||||
|
// The FailFast session-fault branch was NOT taken (session stays Ready equivalent).
|
||||||
|
Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled));
|
||||||
|
|
||||||
|
// The pump and healthy subscriber are unaffected.
|
||||||
|
source.Writer.TryWrite(Event(11));
|
||||||
|
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
|
||||||
|
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
||||||
|
}
|
||||||
|
|
||||||
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
|
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
|
||||||
{
|
{
|
||||||
// Drains any buffered events, then surfaces the channel's completion fault (if any)
|
// Drains any buffered events, then surfaces the channel's completion fault (if any)
|
||||||
|
|||||||
Reference in New Issue
Block a user