fix(sessions): make EventSubscriberLease dispose atomic; dedupe lease dispose
Issue 1: replace plain bool _disposed in EventSubscriberLease with an Interlocked.Exchange int (_leaseDisposed) matching the SubscriberLease pattern in SessionEventDistributor. Concurrent stream-completion + client-cancellation racing Dispose() now decrements _activeEventSubscriberCount exactly once, never to -1. Issue 5: remove the `using` declaration on the subscriber lease in EventStreamService.StreamEventsAsync; the finally block already disposes it alongside the reader, so the using was a redundant second dispose on the same code path. Issue 2: add an inline comment at the StartAsync().GetAwaiter().GetResult() call documenting the sync-over-async invariant (StartAsync only schedules via Task.Run and is synchronous; do not make it truly async without changing this call site). Issue 10: remove the redundant .WithCancellation(cancellationToken) chained on ReadEventsAsync(cancellationToken) in MapWorkerEventsAsync; the [EnumeratorCancellation] token already flows through the direct argument. Issue 9: add EventSubscriberLease_ConcurrentDispose_DecrementsCountExactlyOnce to GatewaySessionTests — 16 concurrent Dispose() calls on the same lease for 200 iterations; asserts count is exactly 0 after each race and a subsequent single-subscriber AttachEventSubscriber succeeds.
This commit is contained in:
@@ -53,7 +53,10 @@ public sealed class EventStreamService(
|
||||
$"Session {request.SessionId} was not found.");
|
||||
}
|
||||
|
||||
using IEventSubscriberLease subscriber = session.AttachEventSubscriber(
|
||||
// No `using` here — subscriber.Dispose() is called exactly once in the finally
|
||||
// block below, which also disposes the reader. A `using` declaration would add a
|
||||
// second Dispose on the same path and double-decrement the session subscriber count.
|
||||
IEventSubscriberLease subscriber = session.AttachEventSubscriber(
|
||||
options.Value.Sessions.AllowMultipleEventSubscribers);
|
||||
|
||||
int streamQueueDepth = 0;
|
||||
|
||||
@@ -399,6 +399,10 @@ public sealed class GatewaySession
|
||||
IEventSubscriberLease lease = distributor.Register();
|
||||
if (startNow)
|
||||
{
|
||||
// StartAsync only schedules the pump via Task.Run and returns a completed task;
|
||||
// it does not perform any async I/O itself. The sync-over-async call here is
|
||||
// therefore safe and will not deadlock. Do not make StartAsync truly async
|
||||
// (i.e., await real I/O before returning) without also changing this call site.
|
||||
distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
@@ -413,7 +417,6 @@ public sealed class GatewaySession
|
||||
{
|
||||
MxAccessGrpcMapper mapper = _eventStreaming.Mapper;
|
||||
await foreach (WorkerEvent workerEvent in ReadEventsAsync(cancellationToken)
|
||||
.WithCancellation(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
yield return mapper.MapEvent(workerEvent);
|
||||
@@ -1236,7 +1239,10 @@ public sealed class GatewaySession
|
||||
private sealed class EventSubscriberLease(GatewaySession session, IEventSubscriberLease distributorLease)
|
||||
: IEventSubscriberLease
|
||||
{
|
||||
private bool _disposed;
|
||||
// 0 = live, 1 = disposed. Interlocked so concurrent stream-completion +
|
||||
// client-cancellation paths cannot both call DetachEventSubscriber and
|
||||
// double-decrement _activeEventSubscriberCount to -1.
|
||||
private int _leaseDisposed;
|
||||
|
||||
/// <inheritdoc />
|
||||
public System.Threading.Channels.ChannelReader<MxEvent> Reader => distributorLease.Reader;
|
||||
@@ -1249,14 +1255,11 @@ public sealed class GatewaySession
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
|
||||
{
|
||||
return;
|
||||
distributorLease.Dispose();
|
||||
session.DetachEventSubscriber();
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
distributorLease.Dispose();
|
||||
session.DetachEventSubscriber();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
using ZB.MOM.WW.MxGateway.Server.Workers;
|
||||
|
||||
@@ -156,6 +159,66 @@ public sealed class GatewaySessionTests
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Issue-1 regression. Concurrent <c>Dispose()</c> calls on the same
|
||||
/// <see cref="IEventSubscriberLease"/> — as can happen when a gRPC stream
|
||||
/// completion and a client cancellation both fire at the same time — must
|
||||
/// decrement <c>_activeEventSubscriberCount</c> exactly once, never to −1.
|
||||
/// A negative count permanently blocks future subscribers because
|
||||
/// <c>AttachEventSubscriber(allowMultipleSubscribers:false)</c> gates on
|
||||
/// <c>_activeEventSubscriberCount > 0</c>. After both racing disposes finish,
|
||||
/// the count must be exactly 0 and a subsequent single-subscriber attach must
|
||||
/// succeed.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task EventSubscriberLease_ConcurrentDispose_DecrementsCountExactlyOnce()
|
||||
{
|
||||
const int Concurrency = 16;
|
||||
const int Iterations = 200;
|
||||
TimeSpan testTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
FakeWorkerClient workerClient = new();
|
||||
GatewaySession session = CreateReadySessionWithEventStreaming(workerClient);
|
||||
|
||||
for (int i = 0; i < Iterations; i++)
|
||||
{
|
||||
// Attach one subscriber; this increments _activeEventSubscriberCount to 1.
|
||||
IEventSubscriberLease lease = session.AttachEventSubscriber(
|
||||
allowMultipleSubscribers: false);
|
||||
|
||||
// Race Concurrency threads all calling Dispose() on the same lease.
|
||||
// Only one must actually run DetachEventSubscriber.
|
||||
using SemaphoreSlim gate = new(0);
|
||||
Task[] tasks = new Task[Concurrency];
|
||||
for (int t = 0; t < Concurrency; t++)
|
||||
{
|
||||
tasks[t] = Task.Run(async () =>
|
||||
{
|
||||
// All threads wait at the gate so they start as simultaneously
|
||||
// as the scheduler allows, maximising the race window.
|
||||
await gate.WaitAsync(testTimeout);
|
||||
lease.Dispose();
|
||||
});
|
||||
}
|
||||
|
||||
gate.Release(Concurrency);
|
||||
await Task.WhenAll(tasks).WaitAsync(testTimeout);
|
||||
|
||||
// Count must be exactly 0 — not negative — after all disposes.
|
||||
Assert.Equal(0, session.ActiveEventSubscriberCount);
|
||||
|
||||
// Observable contract: a fresh single subscriber must now be attachable
|
||||
// (i.e., the guard _activeEventSubscriberCount > 0 is false).
|
||||
IEventSubscriberLease next = session.AttachEventSubscriber(
|
||||
allowMultipleSubscribers: false);
|
||||
next.Dispose();
|
||||
Assert.Equal(0, session.ActiveEventSubscriberCount);
|
||||
}
|
||||
|
||||
await session.CloseAsync("test-done", CancellationToken.None);
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
private static GatewaySession CreateReadySession(IWorkerClient workerClient)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
@@ -177,6 +240,32 @@ public sealed class GatewaySessionTests
|
||||
return session;
|
||||
}
|
||||
|
||||
private static GatewaySession CreateReadySessionWithEventStreaming(IWorkerClient workerClient)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
sessionId: "session-test-concurrent",
|
||||
backendName: "mxaccess",
|
||||
pipeName: "mxaccess-gateway-1-session-test-concurrent",
|
||||
nonce: "nonce",
|
||||
clientIdentity: "client-1",
|
||||
ownerKeyId: null,
|
||||
clientSessionName: "test-session",
|
||||
clientCorrelationId: "client-correlation-1",
|
||||
commandTimeout: TimeSpan.FromSeconds(5),
|
||||
startupTimeout: TimeSpan.FromSeconds(5),
|
||||
shutdownTimeout: TimeSpan.FromSeconds(5),
|
||||
leaseDuration: TimeSpan.FromMinutes(30),
|
||||
openedAt: DateTimeOffset.UtcNow,
|
||||
eventStreaming: new SessionEventStreaming(
|
||||
new MxAccessGrpcMapper(),
|
||||
new EventOptions { QueueCapacity = 8 },
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System));
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
return session;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal worker client that parks <see cref="ShutdownAsync"/> until the test
|
||||
/// explicitly releases it. Used to keep <see cref="GatewaySession.CloseAsync"/>
|
||||
|
||||
Reference in New Issue
Block a user