diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index fa53426..75f50a2 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -53,7 +53,10 @@ public sealed class EventStreamService( $"Session {request.SessionId} was not found."); } - using IEventSubscriberLease subscriber = session.AttachEventSubscriber( + // No `using` here — subscriber.Dispose() is called exactly once in the finally + // block below, which also disposes the reader. A `using` declaration would add a + // second Dispose on the same path and double-decrement the session subscriber count. + IEventSubscriberLease subscriber = session.AttachEventSubscriber( options.Value.Sessions.AllowMultipleEventSubscribers); int streamQueueDepth = 0; diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index 9d58d62..353b839 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -399,6 +399,10 @@ public sealed class GatewaySession IEventSubscriberLease lease = distributor.Register(); if (startNow) { + // StartAsync only schedules the pump via Task.Run and returns a completed task; + // it does not perform any async I/O itself. The sync-over-async call here is + // therefore safe and will not deadlock. Do not make StartAsync truly async + // (i.e., await real I/O before returning) without also changing this call site. distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult(); } @@ -413,7 +417,6 @@ public sealed class GatewaySession { MxAccessGrpcMapper mapper = _eventStreaming.Mapper; await foreach (WorkerEvent workerEvent in ReadEventsAsync(cancellationToken) - .WithCancellation(cancellationToken) .ConfigureAwait(false)) { yield return mapper.MapEvent(workerEvent); @@ -1236,7 +1239,10 @@ public sealed class GatewaySession private sealed class EventSubscriberLease(GatewaySession session, IEventSubscriberLease distributorLease) : IEventSubscriberLease { - private bool _disposed; + // 0 = live, 1 = disposed. Interlocked so concurrent stream-completion + + // client-cancellation paths cannot both call DetachEventSubscriber and + // double-decrement _activeEventSubscriberCount to -1. + private int _leaseDisposed; /// public System.Threading.Channels.ChannelReader Reader => distributorLease.Reader; @@ -1249,14 +1255,11 @@ public sealed class GatewaySession /// public void Dispose() { - if (_disposed) + if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0) { - return; + distributorLease.Dispose(); + session.DetachEventSubscriber(); } - - _disposed = true; - distributorLease.Dispose(); - session.DetachEventSubscriber(); } } } diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs index bae60d3..ae03b4e 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs @@ -1,5 +1,8 @@ using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; @@ -156,6 +159,66 @@ public sealed class GatewaySessionTests await session.DisposeAsync(); } + /// + /// Issue-1 regression. Concurrent Dispose() calls on the same + /// — as can happen when a gRPC stream + /// completion and a client cancellation both fire at the same time — must + /// decrement _activeEventSubscriberCount exactly once, never to −1. + /// A negative count permanently blocks future subscribers because + /// AttachEventSubscriber(allowMultipleSubscribers:false) gates on + /// _activeEventSubscriberCount > 0. After both racing disposes finish, + /// the count must be exactly 0 and a subsequent single-subscriber attach must + /// succeed. + /// + [Fact] + public async Task EventSubscriberLease_ConcurrentDispose_DecrementsCountExactlyOnce() + { + const int Concurrency = 16; + const int Iterations = 200; + TimeSpan testTimeout = TimeSpan.FromSeconds(10); + + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySessionWithEventStreaming(workerClient); + + for (int i = 0; i < Iterations; i++) + { + // Attach one subscriber; this increments _activeEventSubscriberCount to 1. + IEventSubscriberLease lease = session.AttachEventSubscriber( + allowMultipleSubscribers: false); + + // Race Concurrency threads all calling Dispose() on the same lease. + // Only one must actually run DetachEventSubscriber. + using SemaphoreSlim gate = new(0); + Task[] tasks = new Task[Concurrency]; + for (int t = 0; t < Concurrency; t++) + { + tasks[t] = Task.Run(async () => + { + // All threads wait at the gate so they start as simultaneously + // as the scheduler allows, maximising the race window. + await gate.WaitAsync(testTimeout); + lease.Dispose(); + }); + } + + gate.Release(Concurrency); + await Task.WhenAll(tasks).WaitAsync(testTimeout); + + // Count must be exactly 0 — not negative — after all disposes. + Assert.Equal(0, session.ActiveEventSubscriberCount); + + // Observable contract: a fresh single subscriber must now be attachable + // (i.e., the guard _activeEventSubscriberCount > 0 is false). + IEventSubscriberLease next = session.AttachEventSubscriber( + allowMultipleSubscribers: false); + next.Dispose(); + Assert.Equal(0, session.ActiveEventSubscriberCount); + } + + await session.CloseAsync("test-done", CancellationToken.None); + await session.DisposeAsync(); + } + private static GatewaySession CreateReadySession(IWorkerClient workerClient) { GatewaySession session = new( @@ -177,6 +240,32 @@ public sealed class GatewaySessionTests return session; } + private static GatewaySession CreateReadySessionWithEventStreaming(IWorkerClient workerClient) + { + GatewaySession session = new( + sessionId: "session-test-concurrent", + backendName: "mxaccess", + pipeName: "mxaccess-gateway-1-session-test-concurrent", + nonce: "nonce", + clientIdentity: "client-1", + ownerKeyId: null, + clientSessionName: "test-session", + clientCorrelationId: "client-correlation-1", + commandTimeout: TimeSpan.FromSeconds(5), + startupTimeout: TimeSpan.FromSeconds(5), + shutdownTimeout: TimeSpan.FromSeconds(5), + leaseDuration: TimeSpan.FromMinutes(30), + openedAt: DateTimeOffset.UtcNow, + eventStreaming: new SessionEventStreaming( + new MxAccessGrpcMapper(), + new EventOptions { QueueCapacity = 8 }, + NullLogger.Instance, + TimeProvider.System)); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + return session; + } + /// /// Minimal worker client that parks until the test /// explicitly releases it. Used to keep