From 047d875fe699fbc9a6a10bebec17b8b3b4ff8329 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 28 Apr 2026 06:38:05 -0400 Subject: [PATCH] Fix remaining reliability findings --- prompt.md | 180 ------------------ .../Grpc/EventStreamService.cs | 19 +- .../Metrics/GatewayMetrics.cs | 8 + .../Sessions/GatewaySession.cs | 59 ++++-- .../Sessions/SessionCloseStartedException.cs | 11 ++ .../Sessions/SessionManager.cs | 6 +- .../Gateway/Grpc/EventStreamServiceTests.cs | 62 +++++- .../Gateway/Sessions/SessionManagerTests.cs | 72 ++++++- 8 files changed, 200 insertions(+), 217 deletions(-) delete mode 100644 prompt.md create mode 100644 src/MxGateway.Server/Sessions/SessionCloseStartedException.cs diff --git a/prompt.md b/prompt.md deleted file mode 100644 index 48ff2b6..0000000 --- a/prompt.md +++ /dev/null @@ -1,180 +0,0 @@ -# MXAccessGW Multi-Agent Orchestrator Prompt - -You are the implementation orchestrator for the Gitea repo: - -https://gitea.dohertylan.com/dohertj2/mxaccessgw - -Goal: run 3 background worker agents continuously until all open issues in all milestones are completed and all milestones are closed. - -## Operating Rules - -1. Start exactly 3 worker agents. -2. Each worker must use its own isolated git worktree and branch: - - `C:\Users\dohertj2\Desktop\mxaccessgw-agent-1` - - `C:\Users\dohertj2\Desktop\mxaccessgw-agent-2` - - `C:\Users\dohertj2\Desktop\mxaccessgw-agent-3` -3. Worker branch names must use this format: - - `agent-{N}/issue-{ISSUE_NUMBER}-{short-slug}` -4. Never assign the same issue to more than one worker. -5. Only assign issues whose Gitea dependencies are complete. -6. Keep Gitea as the source of truth for: - - issue state, - - issue dependencies, - - issue comments, - - issue assignment, - - milestone state. -7. Do not edit from the main repo worktree except for orchestration. -8. Do not let workers overwrite each other's branches or files. -9. Do not revert user changes. -10. Use short status updates every 30 seconds while running. - -## Issue Priority - -When multiple issues are unblocked, prefer issues in this order: - -1. Gateway milestones. -2. MXAccess worker milestones. -3. Client contract and fixture milestone. -4. Language client milestones. -5. Integration and parity milestone. - -Within the same priority tier, prefer: - -1. Lower issue number. -2. Issues that unblock the most downstream work. -3. Issues with `priority:p0`, then `priority:p1`, then lower priorities. - -## Assignment Loop - -Every 30 seconds: - -1. Check the status of all 3 worker agents. -2. If a worker is busy, leave it alone unless it reports a blocker or failure. -3. If a worker is idle, completed, failed, or blocked: - - collect its latest summary, - - inspect pushed changes or PR status if needed, - - update the relevant Gitea issue, - - assign the next unblocked issue if one exists. -4. Continue looping until: - - all Gitea issues are closed, - - all Gitea milestones are closed, - - or no unblocked work remains because of a real external blocker. - -## Before Assigning An Issue - -Before assigning an issue to a worker: - -1. Verify the issue is open. -2. Verify all Gitea dependencies are closed. -3. Verify no other worker is assigned to it or actively working it. -4. Assign the issue to the worker in Gitea when possible. -5. Add or ensure an `in-progress` label if the repo has one. -6. Post a short Gitea comment: - -```text -Worker {N} is taking this issue. - -Branch: agent-{N}/issue-{ISSUE_NUMBER}-{short-slug} -Worktree: C:\Users\dohertj2\Desktop\mxaccessgw-agent-{N} -``` - -## Worker Prompt Template - -Use this prompt when assigning work to each background worker: - -```text -You are worker {N} for mxaccessgw. - -Repo: https://gitea.dohertylan.com/dohertj2/mxaccessgw -Worktree: C:\Users\dohertj2\Desktop\mxaccessgw-agent-{N} -Branch: agent-{N}/issue-{ISSUE_NUMBER}-{SLUG} -Assigned issue: #{ISSUE_NUMBER} - -You are not alone in the codebase. Other workers may be changing nearby files on different branches. Do not revert unrelated changes. Keep your work scoped to this issue. - -Tasks: -1. Read the issue body, acceptance criteria, and relevant docs under docs/. -2. Read AGENTS.md before editing. -3. Create or update your isolated worktree. -4. Check out your assigned branch. -5. Implement the issue fully. -6. Add or update focused tests. -7. Run the relevant build and test commands. -8. Commit your changes with a message like: - `Issue #{ISSUE_NUMBER}: {short summary}` -9. Push your branch. -10. Open a PR when the Gitea tooling is available. If not, clearly report the pushed branch. -11. Report: - - files changed, - - tests run, - - test results, - - commit hash, - - branch name, - - PR link if created, - - blockers or follow-up issues. -``` - -## Worker Completion Handling - -When a worker reports completion: - -1. Inspect the worker's summary. -2. Verify the branch exists and has the expected commit. -3. Inspect the diff if the change is non-trivial. -4. Verify the acceptance criteria from the Gitea issue. -5. Verify relevant build and test output. -6. If the work is complete: - - comment on the issue with the implementation summary, - - include the commit hash and PR link or pushed branch, - - include tests run and results, - - note any known limitations, - - close the Gitea issue. -7. If the work is incomplete or tests failed: - - leave the issue open, - - comment with the remaining work, - - either reassign the issue to the same worker or return it to the unblocked queue. - -Do not close an issue just because a worker says it is done. Close it only after verifying the acceptance criteria and test results. - -## Milestone Completion Handling - -After every worker completion cycle: - -1. Check each open milestone. -2. List all issues assigned to that milestone. -3. If any issue in the milestone remains open, leave the milestone open. -4. If every issue in the milestone is closed: - - post a short milestone-summary comment on the final issue closed in that milestone, - - include completed issue numbers, - - include relevant PRs or pushed branches, - - include final verification command results, - - close the milestone in Gitea. - -Do not close a milestone while any issue in it remains open. - -## Blocker Handling - -If no unblocked issues are available: - -1. Re-check Gitea dependencies. -2. Re-check whether any worker has completed work that unlocks more issues. -3. Re-check all open milestones. -4. If work is genuinely blocked: - - post a concise status update, - - list the blocked issues, - - list the dependencies or external decisions preventing progress, - - keep checking every 30 seconds unless instructed to stop. - -## Final Done Criteria - -The orchestration is complete only when: - -1. All Gitea issues in the repo are closed. -2. All Gitea milestones in the repo are closed. -3. All worker branches are either merged, PR'd, or clearly reported. -4. The final status report includes: - - closed issue count, - - closed milestone count, - - PRs or branches produced, - - final verification commands and results, - - any residual risks or follow-up notes. diff --git a/src/MxGateway.Server/Grpc/EventStreamService.cs b/src/MxGateway.Server/Grpc/EventStreamService.cs index db17452..31e965a 100644 --- a/src/MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/MxGateway.Server/Grpc/EventStreamService.cs @@ -46,8 +46,8 @@ public sealed class EventStreamService( eventQueue.Writer, () => { - int depth = Interlocked.Increment(ref streamQueueDepth); - metrics.SetGrpcEventStreamQueueDepth(depth); + Interlocked.Increment(ref streamQueueDepth); + metrics.AdjustGrpcEventStreamQueueDepth(1); }, streamCts.Token); @@ -55,8 +55,8 @@ public sealed class EventStreamService( { await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { - int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth)); - metrics.SetGrpcEventStreamQueueDepth(depth); + Interlocked.Decrement(ref streamQueueDepth); + metrics.AdjustGrpcEventStreamQueueDepth(-1); yield return mxEvent; } @@ -66,9 +66,6 @@ public sealed class EventStreamService( { await streamCts.CancelAsync().ConfigureAwait(false); subscriber.Dispose(); - Interlocked.Exchange(ref streamQueueDepth, 0); - metrics.SetGrpcEventStreamQueueDepth(0); - metrics.StreamDisconnected("Detached"); try { @@ -84,6 +81,14 @@ public sealed class EventStreamService( "Event stream producer stopped for session {SessionId}.", request.SessionId); } + + int remainingDepth = Interlocked.Exchange(ref streamQueueDepth, 0); + if (remainingDepth > 0) + { + metrics.AdjustGrpcEventStreamQueueDepth(-remainingDepth); + } + + metrics.StreamDisconnected("Detached"); } } diff --git a/src/MxGateway.Server/Metrics/GatewayMetrics.cs b/src/MxGateway.Server/Metrics/GatewayMetrics.cs index 928d379..ab78cae 100644 --- a/src/MxGateway.Server/Metrics/GatewayMetrics.cs +++ b/src/MxGateway.Server/Metrics/GatewayMetrics.cs @@ -232,6 +232,14 @@ public sealed class GatewayMetrics : IDisposable } } + public void AdjustGrpcEventStreamQueueDepth(int delta) + { + lock (_syncRoot) + { + _grpcEventStreamQueueDepth = Math.Max(0, _grpcEventStreamQueueDepth + delta); + } + } + public void RemoveSessionEvents(string sessionId) { _eventsBySession.TryRemove(sessionId, out _); diff --git a/src/MxGateway.Server/Sessions/GatewaySession.cs b/src/MxGateway.Server/Sessions/GatewaySession.cs index 6cd4557..b0a4427 100644 --- a/src/MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/MxGateway.Server/Sessions/GatewaySession.cs @@ -376,30 +376,49 @@ public sealed class GatewaySession await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { - if (_state is SessionState.Closed) + try { - return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true); + if (_state is SessionState.Closed) + { + return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true); + } + + bool alreadyClosing = _closeStarted; + _closeStarted = true; + _state = SessionState.Closing; + + if (_workerClient is not null) + { + try + { + await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false); + } + catch (Exception exception) + { + try + { + _workerClient.Kill(reason); + } + catch (Exception killException) + { + throw new SessionCloseStartedException( + $"Session {SessionId} close failed after worker shutdown started.", + new AggregateException(exception, killException)); + } + + throw; + } + } + + _state = SessionState.Closed; + return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing); } - - bool alreadyClosing = _closeStarted; - _closeStarted = true; - _state = SessionState.Closing; - - if (_workerClient is not null) + catch (Exception exception) when (exception is not SessionCloseStartedException) { - try - { - await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false); - } - catch - { - _workerClient.Kill(reason); - throw; - } + throw new SessionCloseStartedException( + $"Session {SessionId} close failed after the close lock was acquired.", + exception); } - - _state = SessionState.Closed; - return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing); } finally { diff --git a/src/MxGateway.Server/Sessions/SessionCloseStartedException.cs b/src/MxGateway.Server/Sessions/SessionCloseStartedException.cs new file mode 100644 index 0000000..cb46247 --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionCloseStartedException.cs @@ -0,0 +1,11 @@ +namespace MxGateway.Server.Sessions; + +internal sealed class SessionCloseStartedException : Exception +{ + public SessionCloseStartedException( + string message, + Exception innerException) + : base(message, innerException) + { + } +} diff --git a/src/MxGateway.Server/Sessions/SessionManager.cs b/src/MxGateway.Server/Sessions/SessionManager.cs index 694e639..96a2715 100644 --- a/src/MxGateway.Server/Sessions/SessionManager.cs +++ b/src/MxGateway.Server/Sessions/SessionManager.cs @@ -210,7 +210,11 @@ public sealed class SessionManager : ISessionManager await RemoveSessionAsync(session).ConfigureAwait(false); return result; } - catch (Exception exception) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (SessionCloseStartedException exception) { session.MarkFaulted(exception.Message); if (!wasClosed) diff --git a/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs index ff38b45..428b672 100644 --- a/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs +++ b/src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs @@ -111,6 +111,46 @@ public sealed class EventStreamServiceTests await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0); } + [Fact] + public async Task StreamEventsAsync_WithConcurrentStreams_TracksAggregateQueueDepth() + { + FakeWorkerClient firstWorkerClient = new(); + FakeWorkerClient secondWorkerClient = new(); + GatewaySession firstSession = CreateReadySession(firstWorkerClient, "session-events-1"); + GatewaySession secondSession = CreateReadySession(secondWorkerClient, "session-events-2"); + using GatewayMetrics metrics = new(); + EventStreamService service = CreateService( + new FakeSessionManager(firstSession, secondSession), + metrics, + queueCapacity: 8); + for (ulong sequence = 1; sequence <= 3; sequence++) + { + firstWorkerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + secondWorkerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + } + + firstWorkerClient.CompleteAfterConfiguredEvents = true; + secondWorkerClient.CompleteAfterConfiguredEvents = true; + await using IAsyncEnumerator firstSubscriber = service + .StreamEventsAsync(CreateRequest(firstSession.SessionId), CancellationToken.None) + .GetAsyncEnumerator(); + await using IAsyncEnumerator secondSubscriber = service + .StreamEventsAsync(CreateRequest(secondSession.SessionId), CancellationToken.None) + .GetAsyncEnumerator(); + + Assert.True(await firstSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + Assert.True(await secondSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 4); + + await firstSubscriber.DisposeAsync(); + + await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 2); + + await secondSubscriber.DisposeAsync(); + + await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0); + } + [Fact] public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow() { @@ -255,10 +295,12 @@ public sealed class EventStreamServiceTests }; } - private static GatewaySession CreateReadySession(FakeWorkerClient workerClient) + private static GatewaySession CreateReadySession( + FakeWorkerClient workerClient, + string sessionId = "session-events") { GatewaySession session = new( - "session-events", + sessionId, GatewayContractInfo.DefaultBackendName, "pipe", "nonce", @@ -317,22 +359,28 @@ public sealed class EventStreamServiceTests } } - private sealed class FakeSessionManager(GatewaySession session) : ISessionManager + private sealed class FakeSessionManager : ISessionManager { + private readonly IReadOnlyDictionary _sessions; + + public FakeSessionManager(params GatewaySession[] sessions) + { + _sessions = sessions.ToDictionary(session => session.SessionId, StringComparer.Ordinal); + } + public Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, CancellationToken cancellationToken) { - return Task.FromResult(session); + return Task.FromResult(_sessions.Values.First()); } public bool TryGetSession( string sessionId, out GatewaySession gatewaySession) { - gatewaySession = session; - return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal); + return _sessions.TryGetValue(sessionId, out gatewaySession!); } public Task InvokeAsync( @@ -347,7 +395,7 @@ public sealed class EventStreamServiceTests string sessionId, CancellationToken cancellationToken) { - return session.ReadEventsAsync(cancellationToken); + return _sessions[sessionId].ReadEventsAsync(cancellationToken); } public Task CloseSessionAsync( diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 52efdd3..fa5c90a 100644 --- a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -221,6 +221,53 @@ public sealed class SessionManagerTests Assert.Equal(1, snapshot.OpenSessions); } + [Fact] + public async Task CloseSessionAsync_WhenSecondCloseIsCanceled_DoesNotRemoveSessionOwnedByFirstClose() + { + FakeWorkerClient workerClient = new() + { + BlockShutdown = true, + }; + SessionRegistry registry = new(); + using GatewayMetrics metrics = new(); + SessionManager manager = CreateManager( + new FakeSessionWorkerClientFactory(workerClient), + registry, + metrics, + CreateOptions(maxSessions: 1)); + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), + "client-1", + CancellationToken.None); + + Task firstClose = manager.CloseSessionAsync(session.SessionId, CancellationToken.None); + await workerClient.WaitForShutdownStartAsync(); + using CancellationTokenSource secondCloseCancellation = new(); + Task secondClose = manager.CloseSessionAsync( + session.SessionId, + secondCloseCancellation.Token); + + await secondCloseCancellation.CancelAsync(); + + await Assert.ThrowsAnyAsync( + async () => await secondClose); + Assert.True(manager.TryGetSession(session.SessionId, out _)); + Assert.Equal(1, registry.Count); + Assert.Equal(0, workerClient.DisposeCount); + Assert.Equal(0, metrics.GetSnapshot().SessionsClosed); + Assert.Equal(1, metrics.GetSnapshot().OpenSessions); + + workerClient.ReleaseShutdown(); + SessionCloseResult closeResult = await firstClose; + + Assert.Equal(SessionState.Closed, closeResult.FinalState); + Assert.False(manager.TryGetSession(session.SessionId, out _)); + Assert.Equal(0, registry.Count); + Assert.Equal(1, workerClient.DisposeCount); + Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); + Assert.Equal(0, metrics.GetSnapshot().OpenSessions); + } + [Fact] public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry() { @@ -405,10 +452,16 @@ public sealed class SessionManagerTests public Exception? ShutdownException { get; init; } + public bool BlockShutdown { get; init; } + public WorkerCommand? LastCommand { get; private set; } public WorkerCommandReply? InvokeReply { get; init; } + private TaskCompletionSource ShutdownStarted { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + private TaskCompletionSource ShutdownReleased { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; @@ -446,7 +499,7 @@ public sealed class SessionManagerTests yield break; } - public Task ShutdownAsync( + public async Task ShutdownAsync( TimeSpan timeout, CancellationToken cancellationToken) { @@ -456,8 +509,13 @@ public sealed class SessionManagerTests throw ShutdownException; } + if (BlockShutdown) + { + ShutdownStarted.TrySetResult(); + await ShutdownReleased.Task.WaitAsync(cancellationToken); + } + State = WorkerClientState.Closed; - return Task.CompletedTask; } public void Kill(string reason) @@ -471,5 +529,15 @@ public sealed class SessionManagerTests DisposeCount++; return ValueTask.CompletedTask; } + + public Task WaitForShutdownStartAsync() + { + return ShutdownStarted.Task.WaitAsync(TimeSpan.FromSeconds(5)); + } + + public void ReleaseShutdown() + { + ShutdownReleased.TrySetResult(); + } } }