Fix Galaxy paging review findings

This commit is contained in:
Joseph Doherty
2026-04-29 11:59:49 -04:00
parent d543679044
commit ac2787f619
14 changed files with 261 additions and 22 deletions
@@ -814,9 +814,7 @@ public static class MxGatewayClientCli
TextWriter output,
CancellationToken cancellationToken)
{
DiscoverHierarchyReply reply = await client.GalaxyDiscoverHierarchyAsync(
new DiscoverHierarchyRequest(),
cancellationToken)
DiscoverHierarchyReply reply = await DiscoverAllGalaxyHierarchyAsync(client, cancellationToken)
.ConfigureAwait(false);
if (arguments.HasFlag("json"))
@@ -834,6 +832,39 @@ public static class MxGatewayClientCli
return 0;
}
private static async Task<DiscoverHierarchyReply> DiscoverAllGalaxyHierarchyAsync(
IMxGatewayCliClient client,
CancellationToken cancellationToken)
{
DiscoverHierarchyReply aggregate = new();
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
string pageToken = string.Empty;
do
{
DiscoverHierarchyReply page = await client.GalaxyDiscoverHierarchyAsync(
new DiscoverHierarchyRequest
{
PageSize = 5000,
PageToken = pageToken,
},
cancellationToken)
.ConfigureAwait(false);
aggregate.Objects.Add(page.Objects);
aggregate.TotalObjectCount = page.TotalObjectCount;
pageToken = page.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken)
&& !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
return aggregate;
}
private static async Task<int> GalaxyWatchAsync(
CliArguments arguments,
IMxGatewayCliClient client,
@@ -140,6 +140,26 @@ public sealed class GalaxyRepositoryClientTests
Assert.False(call.CallOptions.CancellationToken.IsCancellationRequested);
}
[Fact]
public async Task DiscoverHierarchyAsync_WithRepeatedPageToken_ThrowsProtocolError()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.DiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "7:1",
});
transport.DiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "7:1",
});
await using GalaxyRepositoryClient client = CreateClient(transport);
MxGatewayException exception = await Assert.ThrowsAsync<MxGatewayException>(
async () => await client.DiscoverHierarchyAsync());
Assert.Contains("repeated page token", exception.Message, StringComparison.Ordinal);
}
[Fact]
public async Task TestConnectionAsync_RetriesOnTransientGrpcFailure()
{
@@ -207,8 +207,10 @@ public sealed class MxGatewayClientCliTests
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.GalaxyDiscoverHierarchyReply = new DiscoverHierarchyReply
fakeClient.GalaxyDiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
NextPageToken = "7:1",
TotalObjectCount = 2,
Objects =
{
new GalaxyObject
@@ -227,7 +229,21 @@ public sealed class MxGatewayClientCliTests
},
},
},
};
});
fakeClient.GalaxyDiscoverHierarchyReplies.Enqueue(new DiscoverHierarchyReply
{
TotalObjectCount = 2,
Objects =
{
new GalaxyObject
{
GobjectId = 8,
TagName = "DelmiaReceiver_002",
ContainedName = "DelmiaReceiver",
ParentGobjectId = 1,
},
},
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
@@ -242,10 +258,14 @@ public sealed class MxGatewayClientCliTests
_ => fakeClient);
Assert.Equal(0, exitCode);
Assert.Single(fakeClient.GalaxyDiscoverHierarchyRequests);
Assert.Equal(2, fakeClient.GalaxyDiscoverHierarchyRequests.Count);
Assert.Equal(5000, fakeClient.GalaxyDiscoverHierarchyRequests[0].PageSize);
Assert.Equal("", fakeClient.GalaxyDiscoverHierarchyRequests[0].PageToken);
Assert.Equal("7:1", fakeClient.GalaxyDiscoverHierarchyRequests[1].PageToken);
string text = output.ToString();
Assert.Contains("objects=1", text);
Assert.Contains("objects=2", text);
Assert.Contains("DelmiaReceiver_001", text);
Assert.Contains("DelmiaReceiver_002", text);
Assert.Contains("attributes=1", text);
Assert.Equal(string.Empty, error.ToString());
}
@@ -411,6 +431,8 @@ public sealed class MxGatewayClientCliTests
public DiscoverHierarchyReply GalaxyDiscoverHierarchyReply { get; set; } = new();
public Queue<DiscoverHierarchyReply> GalaxyDiscoverHierarchyReplies { get; } = new();
public List<TestConnectionRequest> GalaxyTestConnectionRequests { get; } = [];
public List<GetLastDeployTimeRequest> GalaxyGetLastDeployTimeRequests { get; } = [];
@@ -438,7 +460,10 @@ public sealed class MxGatewayClientCliTests
CancellationToken cancellationToken)
{
GalaxyDiscoverHierarchyRequests.Add(request);
return Task.FromResult(GalaxyDiscoverHierarchyReply);
return Task.FromResult(
GalaxyDiscoverHierarchyReplies.TryDequeue(out DiscoverHierarchyReply? reply)
? reply
: GalaxyDiscoverHierarchyReply);
}
public List<WatchDeployEventsRequest> GalaxyWatchDeployEventsRequests { get; } = [];
@@ -146,6 +146,7 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
public async Task<IReadOnlyList<GalaxyObject>> DiscoverHierarchyAsync(CancellationToken cancellationToken = default)
{
List<GalaxyObject> objects = [];
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
string pageToken = string.Empty;
do
{
@@ -160,6 +161,12 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
objects.AddRange(reply.Objects);
pageToken = reply.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken)
&& !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
+6
View File
@@ -3,6 +3,7 @@ package mxgateway
import (
"context"
"errors"
"fmt"
"io"
"time"
@@ -148,6 +149,7 @@ func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject,
defer cancel()
var objects []*GalaxyObject
seenPageTokens := make(map[string]struct{})
pageToken := ""
for {
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{
@@ -162,6 +164,10 @@ func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject,
if pageToken == "" {
break
}
if _, seen := seenPageTokens[pageToken]; seen {
return nil, fmt.Errorf("mxgateway: galaxy discover hierarchy returned repeated page token %q", pageToken)
}
seenPageTokens[pageToken] = struct{}{}
}
return objects, nil
}
+20
View File
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"net"
"strings"
"testing"
"time"
@@ -159,6 +160,25 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
}
}
func TestGalaxyDiscoverHierarchyRejectsRepeatedPageToken(t *testing.T) {
fake := &fakeGalaxyServer{
discoverReplies: []*pb.DiscoverHierarchyReply{
{NextPageToken: "7:1"},
{NextPageToken: "7:1"},
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
_, err := client.DiscoverHierarchy(context.Background())
if err == nil {
t.Fatal("DiscoverHierarchy() error = nil, want repeated token error")
}
if !strings.Contains(err.Error(), "repeated page token") {
t.Fatalf("error = %v, want repeated page token", err)
}
}
func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) {
fake := &fakeGalaxyServer{failTest: true}
client, cleanup := newGalaxyBufconnClient(t, fake)
@@ -133,6 +133,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
public List<GalaxyObject> discoverHierarchy() {
try {
java.util.ArrayList<GalaxyObject> objects = new java.util.ArrayList<>();
java.util.HashSet<String> seenPageTokens = new java.util.HashSet<>();
String pageToken = "";
do {
DiscoverHierarchyReply reply = rawBlockingStub().discoverHierarchy(DiscoverHierarchyRequest.newBuilder()
@@ -141,6 +142,10 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
.build());
objects.addAll(reply.getObjectsList());
pageToken = reply.getNextPageToken();
if (!pageToken.isBlank() && !seenPageTokens.add(pageToken)) {
throw new MxGatewayException(
"galaxy discover hierarchy returned repeated page token: " + pageToken);
}
} while (!pageToken.isBlank());
return objects;
} catch (RuntimeException error) {
@@ -152,7 +157,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
}
public CompletableFuture<List<GalaxyObject>> discoverHierarchyAsync() {
return discoverHierarchyPageAsync("", new java.util.ArrayList<>());
return discoverHierarchyPageAsync("", new java.util.ArrayList<>(), new java.util.HashSet<>());
}
/**
@@ -268,7 +273,7 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
}
private CompletableFuture<List<GalaxyObject>> discoverHierarchyPageAsync(
String pageToken, java.util.ArrayList<GalaxyObject> objects) {
String pageToken, java.util.ArrayList<GalaxyObject> objects, java.util.HashSet<String> seenPageTokens) {
DiscoverHierarchyRequest request = DiscoverHierarchyRequest.newBuilder()
.setPageSize(DISCOVER_HIERARCHY_PAGE_SIZE)
.setPageToken(pageToken)
@@ -278,7 +283,13 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
if (reply.getNextPageToken().isBlank()) {
return CompletableFuture.completedFuture(objects);
}
return discoverHierarchyPageAsync(reply.getNextPageToken(), objects);
if (!seenPageTokens.add(reply.getNextPageToken())) {
CompletableFuture<List<GalaxyObject>> failed = new CompletableFuture<>();
failed.completeExceptionally(new MxGatewayException(
"galaxy discover hierarchy returned repeated page token: " + reply.getNextPageToken()));
return failed;
}
return discoverHierarchyPageAsync(reply.getNextPageToken(), objects, seenPageTokens);
});
}
@@ -3,6 +3,7 @@ package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.protobuf.Timestamp;
@@ -174,6 +175,27 @@ final class GalaxyRepositoryClientTests {
assertFalse(stream.hasNext());
}
@Test
void discoverHierarchyRejectsRepeatedPageToken() throws Exception {
TestService service = new TestService() {
@Override
public void discoverHierarchy(
DiscoverHierarchyRequest request, StreamObserver<DiscoverHierarchyReply> responseObserver) {
responseObserver.onNext(DiscoverHierarchyReply.newBuilder()
.setNextPageToken("7:1")
.build());
responseObserver.onCompleted();
}
};
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
MxGatewayException error = assertThrows(MxGatewayException.class, client::discoverHierarchy);
assertTrue(error.getMessage().contains("repeated page token"));
}
}
@Test
void watchDeployEventsReceivesEventsInOrder() throws Exception {
DeployEvent first = DeployEvent.newBuilder()
+7 -1
View File
@@ -18,7 +18,7 @@ import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from .auth import merge_metadata
from .errors import map_rpc_error
from .errors import MxGatewayError, map_rpc_error
from .generated import galaxy_repository_pb2 as galaxy_pb
from .generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
from .options import ClientOptions, create_channel
@@ -115,6 +115,7 @@ class GalaxyRepositoryClient:
"""Return the deployed Galaxy object hierarchy as raw proto messages."""
objects: list[galaxy_pb.GalaxyObject] = []
seen_page_tokens: set[str] = set()
page_token = ""
while True:
reply = await self._unary(
@@ -129,6 +130,11 @@ class GalaxyRepositoryClient:
page_token = reply.next_page_token
if not page_token:
return objects
if page_token in seen_page_tokens:
raise MxGatewayError(
f"galaxy discover hierarchy returned repeated page token {page_token!r}"
)
seen_page_tokens.add(page_token)
def watch_deploy_events(
self,
+16
View File
@@ -148,6 +148,22 @@ async def test_discover_hierarchy_returns_proto_objects() -> None:
assert objects[1].attributes[0].full_tag_reference == "DelmiaReceiver_001.DownloadPath"
@pytest.mark.asyncio
async def test_discover_hierarchy_rejects_repeated_page_token() -> None:
stub = FakeGalaxyStub()
stub.discover_hierarchy.replies = [
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
with pytest.raises(Exception, match="repeated page token"):
await client.discover_hierarchy()
@pytest.mark.asyncio
async def test_watch_deploy_events_yields_events_in_order() -> None:
ts1 = Timestamp()
+40
View File
@@ -144,6 +144,7 @@ impl GalaxyClient {
/// the object's identifying names plus its dynamic attributes.
pub async fn discover_hierarchy(&mut self) -> Result<Vec<GalaxyObject>, Error> {
let mut objects = Vec::new();
let mut seen_page_tokens = std::collections::HashSet::new();
let mut page_token = String::new();
loop {
let response = self
@@ -159,6 +160,14 @@ impl GalaxyClient {
if page_token.is_empty() {
return Ok(objects);
}
if !seen_page_tokens.insert(page_token.clone()) {
return Err(Error::InvalidArgument {
name: "page_token".to_owned(),
detail: format!(
"galaxy discover hierarchy returned repeated page token `{page_token}`"
),
});
}
}
}
@@ -517,6 +526,37 @@ mod tests {
);
}
#[tokio::test]
async fn discover_hierarchy_rejects_repeated_page_token() {
let state = Arc::new(FakeState::default());
state
.discover_replies
.lock()
.unwrap()
.push_back(DiscoverHierarchyReply {
objects: Vec::new(),
next_page_token: "7:1".to_owned(),
total_object_count: 1,
});
state
.discover_replies
.lock()
.unwrap()
.push_back(DiscoverHierarchyReply {
objects: Vec::new(),
next_page_token: "7:1".to_owned(),
total_object_count: 1,
});
let endpoint = spawn_fake(state).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let error = client.discover_hierarchy().await.unwrap_err();
assert!(error.to_string().contains("repeated page token"));
}
#[tokio::test]
async fn watch_deploy_events_yields_events_in_order() {
let state = Arc::new(FakeState::default());
@@ -99,6 +99,14 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
LastQueriedAt = queriedAt,
LastSuccessAt = queriedAt,
LastError = null,
DashboardSummary = previous.DashboardSummary with
{
Status = DashboardGalaxyStatus.Healthy,
LastQueriedAt = queriedAt,
LastSuccessAt = queriedAt,
LastDeployTime = deployTime,
LastError = null,
},
};
Volatile.Write(ref _current, refreshed);
_firstLoad.TrySetResult();
@@ -68,7 +68,8 @@ public sealed class GalaxyRepositoryGrpcService(
ResolveUnavailableMessage(entry)));
}
int offset = ParsePageToken(request.PageToken);
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence);
int offset = pageToken.Offset;
if (offset > entry.Objects.Count)
{
throw new RpcException(new Status(
@@ -90,7 +91,7 @@ public sealed class GalaxyRepositoryGrpcService(
int nextOffset = offset + take;
if (nextOffset < entry.Objects.Count)
{
reply.NextPageToken = nextOffset.ToString(System.Globalization.CultureInfo.InvariantCulture);
reply.NextPageToken = FormatPageToken(entry.Sequence, nextOffset);
}
return reply;
@@ -182,15 +183,30 @@ public sealed class GalaxyRepositoryGrpcService(
return Math.Min(pageSize, MaxDiscoverPageSize);
}
private static int ParsePageToken(string pageToken)
private static string FormatPageToken(long sequence, int offset)
{
return string.Concat(
sequence.ToString(System.Globalization.CultureInfo.InvariantCulture),
":",
offset.ToString(System.Globalization.CultureInfo.InvariantCulture));
}
private static PageToken ParsePageToken(string pageToken, long currentSequence)
{
if (string.IsNullOrWhiteSpace(pageToken))
{
return 0;
return new PageToken(currentSequence, Offset: 0);
}
if (!int.TryParse(
pageToken,
string[] parts = pageToken.Split(':', count: 2);
if (parts.Length != 2
|| !long.TryParse(
parts[0],
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out long sequence)
|| !int.TryParse(
parts[1],
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out int offset)
@@ -201,9 +217,18 @@ public sealed class GalaxyRepositoryGrpcService(
"DiscoverHierarchy page_token is invalid."));
}
return offset;
if (sequence != currentSequence)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is stale."));
}
return new PageToken(sequence, offset);
}
private sealed record PageToken(long Sequence, int Offset);
[System.Diagnostics.CodeAnalysis.SuppressMessage(
"Style",
"IDE0051:Remove unused private members",
@@ -24,7 +24,7 @@ public sealed class GalaxyRepositoryGrpcServiceTests
Assert.Equal(2, reply.Objects.Count);
Assert.Equal("Object_001", reply.Objects[0].TagName);
Assert.Equal("Object_002", reply.Objects[1].TagName);
Assert.Equal("2", reply.NextPageToken);
Assert.Equal("7:2", reply.NextPageToken);
Assert.Equal(3, reply.TotalObjectCount);
}
@@ -37,7 +37,7 @@ public sealed class GalaxyRepositoryGrpcServiceTests
new DiscoverHierarchyRequest
{
PageSize = 2,
PageToken = "2",
PageToken = "7:2",
},
new TestServerCallContext());
@@ -50,7 +50,8 @@ public sealed class GalaxyRepositoryGrpcServiceTests
[Theory]
[InlineData("-1", 1)]
[InlineData("not-an-offset", 1)]
[InlineData("4", 1)]
[InlineData("7:4", 1)]
[InlineData("6:2", 1)]
[InlineData("", -1)]
public async Task DiscoverHierarchy_WithInvalidPagingArguments_ReturnsInvalidArgument(
string pageToken,
@@ -88,6 +89,7 @@ public sealed class GalaxyRepositoryGrpcServiceTests
return GalaxyHierarchyCacheEntry.Empty with
{
Status = GalaxyCacheStatus.Healthy,
Sequence = 7,
LastSuccessAt = DateTimeOffset.UtcNow,
Objects = objects,
DashboardSummary = DashboardGalaxySummary.Unknown with