Fix remaining reliability findings

This commit is contained in:
Joseph Doherty
2026-04-28 06:38:05 -04:00
parent b0041c5d18
commit 047d875fe6
8 changed files with 200 additions and 217 deletions
@@ -46,8 +46,8 @@ public sealed class EventStreamService(
eventQueue.Writer,
() =>
{
int depth = Interlocked.Increment(ref streamQueueDepth);
metrics.SetGrpcEventStreamQueueDepth(depth);
Interlocked.Increment(ref streamQueueDepth);
metrics.AdjustGrpcEventStreamQueueDepth(1);
},
streamCts.Token);
@@ -55,8 +55,8 @@ public sealed class EventStreamService(
{
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth));
metrics.SetGrpcEventStreamQueueDepth(depth);
Interlocked.Decrement(ref streamQueueDepth);
metrics.AdjustGrpcEventStreamQueueDepth(-1);
yield return mxEvent;
}
@@ -66,9 +66,6 @@ public sealed class EventStreamService(
{
await streamCts.CancelAsync().ConfigureAwait(false);
subscriber.Dispose();
Interlocked.Exchange(ref streamQueueDepth, 0);
metrics.SetGrpcEventStreamQueueDepth(0);
metrics.StreamDisconnected("Detached");
try
{
@@ -84,6 +81,14 @@ public sealed class EventStreamService(
"Event stream producer stopped for session {SessionId}.",
request.SessionId);
}
int remainingDepth = Interlocked.Exchange(ref streamQueueDepth, 0);
if (remainingDepth > 0)
{
metrics.AdjustGrpcEventStreamQueueDepth(-remainingDepth);
}
metrics.StreamDisconnected("Detached");
}
}
@@ -232,6 +232,14 @@ public sealed class GatewayMetrics : IDisposable
}
}
public void AdjustGrpcEventStreamQueueDepth(int delta)
{
lock (_syncRoot)
{
_grpcEventStreamQueueDepth = Math.Max(0, _grpcEventStreamQueueDepth + delta);
}
}
public void RemoveSessionEvents(string sessionId)
{
_eventsBySession.TryRemove(sessionId, out _);
+39 -20
View File
@@ -376,30 +376,49 @@ public sealed class GatewaySession
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_state is SessionState.Closed)
try
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
if (_state is SessionState.Closed)
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
try
{
_workerClient.Kill(reason);
}
catch (Exception killException)
{
throw new SessionCloseStartedException(
$"Session {SessionId} close failed after worker shutdown started.",
new AggregateException(exception, killException));
}
throw;
}
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
catch (Exception exception) when (exception is not SessionCloseStartedException)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch
{
_workerClient.Kill(reason);
throw;
}
throw new SessionCloseStartedException(
$"Session {SessionId} close failed after the close lock was acquired.",
exception);
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
finally
{
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Sessions;
internal sealed class SessionCloseStartedException : Exception
{
public SessionCloseStartedException(
string message,
Exception innerException)
: base(message, innerException)
{
}
}
@@ -210,7 +210,11 @@ public sealed class SessionManager : ISessionManager
await RemoveSessionAsync(session).ConfigureAwait(false);
return result;
}
catch (Exception exception)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (SessionCloseStartedException exception)
{
session.MarkFaulted(exception.Message);
if (!wasClosed)
@@ -111,6 +111,46 @@ public sealed class EventStreamServiceTests
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0);
}
[Fact]
public async Task StreamEventsAsync_WithConcurrentStreams_TracksAggregateQueueDepth()
{
FakeWorkerClient firstWorkerClient = new();
FakeWorkerClient secondWorkerClient = new();
GatewaySession firstSession = CreateReadySession(firstWorkerClient, "session-events-1");
GatewaySession secondSession = CreateReadySession(secondWorkerClient, "session-events-2");
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(
new FakeSessionManager(firstSession, secondSession),
metrics,
queueCapacity: 8);
for (ulong sequence = 1; sequence <= 3; sequence++)
{
firstWorkerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
secondWorkerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
firstWorkerClient.CompleteAfterConfiguredEvents = true;
secondWorkerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> firstSubscriber = service
.StreamEventsAsync(CreateRequest(firstSession.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
await using IAsyncEnumerator<MxEvent> secondSubscriber = service
.StreamEventsAsync(CreateRequest(secondSession.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await firstSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.True(await secondSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 4);
await firstSubscriber.DisposeAsync();
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 2);
await secondSubscriber.DisposeAsync();
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0);
}
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
{
@@ -255,10 +295,12 @@ public sealed class EventStreamServiceTests
};
}
private static GatewaySession CreateReadySession(FakeWorkerClient workerClient)
private static GatewaySession CreateReadySession(
FakeWorkerClient workerClient,
string sessionId = "session-events")
{
GatewaySession session = new(
"session-events",
sessionId,
GatewayContractInfo.DefaultBackendName,
"pipe",
"nonce",
@@ -317,22 +359,28 @@ public sealed class EventStreamServiceTests
}
}
private sealed class FakeSessionManager(GatewaySession session) : ISessionManager
private sealed class FakeSessionManager : ISessionManager
{
private readonly IReadOnlyDictionary<string, GatewaySession> _sessions;
public FakeSessionManager(params GatewaySession[] sessions)
{
_sessions = sessions.ToDictionary(session => session.SessionId, StringComparer.Ordinal);
}
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
return Task.FromResult(session);
return Task.FromResult(_sessions.Values.First());
}
public bool TryGetSession(
string sessionId,
out GatewaySession gatewaySession)
{
gatewaySession = session;
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
return _sessions.TryGetValue(sessionId, out gatewaySession!);
}
public Task<WorkerCommandReply> InvokeAsync(
@@ -347,7 +395,7 @@ public sealed class EventStreamServiceTests
string sessionId,
CancellationToken cancellationToken)
{
return session.ReadEventsAsync(cancellationToken);
return _sessions[sessionId].ReadEventsAsync(cancellationToken);
}
public Task<SessionCloseResult> CloseSessionAsync(
@@ -221,6 +221,53 @@ public sealed class SessionManagerTests
Assert.Equal(1, snapshot.OpenSessions);
}
[Fact]
public async Task CloseSessionAsync_WhenSecondCloseIsCanceled_DoesNotRemoveSessionOwnedByFirstClose()
{
FakeWorkerClient workerClient = new()
{
BlockShutdown = true,
};
SessionRegistry registry = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(
new FakeSessionWorkerClientFactory(workerClient),
registry,
metrics,
CreateOptions(maxSessions: 1));
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-1",
CancellationToken.None);
Task<SessionCloseResult> firstClose = manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
await workerClient.WaitForShutdownStartAsync();
using CancellationTokenSource secondCloseCancellation = new();
Task<SessionCloseResult> secondClose = manager.CloseSessionAsync(
session.SessionId,
secondCloseCancellation.Token);
await secondCloseCancellation.CancelAsync();
await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await secondClose);
Assert.True(manager.TryGetSession(session.SessionId, out _));
Assert.Equal(1, registry.Count);
Assert.Equal(0, workerClient.DisposeCount);
Assert.Equal(0, metrics.GetSnapshot().SessionsClosed);
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
workerClient.ReleaseShutdown();
SessionCloseResult closeResult = await firstClose;
Assert.Equal(SessionState.Closed, closeResult.FinalState);
Assert.False(manager.TryGetSession(session.SessionId, out _));
Assert.Equal(0, registry.Count);
Assert.Equal(1, workerClient.DisposeCount);
Assert.Equal(1, metrics.GetSnapshot().SessionsClosed);
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
[Fact]
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
{
@@ -405,10 +452,16 @@ public sealed class SessionManagerTests
public Exception? ShutdownException { get; init; }
public bool BlockShutdown { get; init; }
public WorkerCommand? LastCommand { get; private set; }
public WorkerCommandReply? InvokeReply { get; init; }
private TaskCompletionSource ShutdownStarted { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
private TaskCompletionSource ShutdownReleased { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
@@ -446,7 +499,7 @@ public sealed class SessionManagerTests
yield break;
}
public Task ShutdownAsync(
public async Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
@@ -456,8 +509,13 @@ public sealed class SessionManagerTests
throw ShutdownException;
}
if (BlockShutdown)
{
ShutdownStarted.TrySetResult();
await ShutdownReleased.Task.WaitAsync(cancellationToken);
}
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
@@ -471,5 +529,15 @@ public sealed class SessionManagerTests
DisposeCount++;
return ValueTask.CompletedTask;
}
public Task WaitForShutdownStartAsync()
{
return ShutdownStarted.Task.WaitAsync(TimeSpan.FromSeconds(5));
}
public void ReleaseShutdown()
{
ShutdownReleased.TrySetResult();
}
}
}