Compare commits

...

8 Commits

Author SHA1 Message Date
Joseph Doherty b4bc2df015 client/java: LazyBrowseNode walker for lazy hierarchy browse 2026-05-28 14:29:15 -04:00
Joseph Doherty fd2a0ac4c7 client/go: LazyBrowseNode walker for lazy hierarchy browse 2026-05-28 14:26:41 -04:00
Joseph Doherty 555e4be51f client/rust: LazyBrowseNode walker for lazy hierarchy browse 2026-05-28 14:26:05 -04:00
Joseph Doherty 1d8c0d83c4 client/python: LazyBrowseNode walker for lazy hierarchy browse 2026-05-28 14:24:23 -04:00
Joseph Doherty 6600f2a7bd client/dotnet: LazyBrowseNode walker for lazy hierarchy browse 2026-05-28 14:24:17 -04:00
Joseph Doherty 803a207ad2 client/java: regenerate protos for BrowseChildren
Regen'd from galaxy_repository.proto after BrowseChildren RPC was added.
GalaxyRepositoryGrpc and GalaxyRepositoryOuterClass now include the
BrowseChildrenRequest/BrowseChildrenReply types and stub methods.
2026-05-28 14:21:56 -04:00
Joseph Doherty 97e583e96b docs: implementation plan for per-language LazyBrowseNode walker
9 tasks: Java toolchain install (Homebrew), 5 parallel per-language
walker implementations, README updates, final verification. Java
walker is gated on toolchain bootstrap success; other languages
proceed independently if Java fails.
2026-05-28 14:17:52 -04:00
Joseph Doherty eaf479349d docs: design for client-side LazyBrowseNode walker + per-language tests
Adds one high-level walker per client (.NET/Python/Rust/Go/Java) plus
six unit tests each against existing fake transports. One-shot idempotent
Expand semantics; pagination hidden inside the helper. Includes Java
toolchain bootstrap (Homebrew Temurin + Gradle) so the Java client can
build locally on the macOS dev host.
2026-05-28 14:12:03 -04:00
23 changed files with 7888 additions and 29 deletions
@@ -123,6 +123,39 @@ internal sealed class FakeGalaxyRepositoryTransport(MxGatewayClientOptions optio
: DiscoverHierarchyReply);
}
/// <summary>Records BrowseChildren RPC calls made by the client.</summary>
public List<(BrowseChildrenRequest Request, CallOptions CallOptions)> BrowseChildrenCalls { get; } = [];
/// <summary>Default reply returned from BrowseChildren when the queue is empty.</summary>
public BrowseChildrenReply BrowseChildrenReply { get; set; } = new();
/// <summary>Queue of replies returned from BrowseChildren; dequeued in FIFO order.</summary>
public Queue<BrowseChildrenReply> BrowseChildrenReplies { get; } = new();
/// <summary>Queue of exceptions to throw from BrowseChildren; dequeued in FIFO order.</summary>
public Queue<Exception> BrowseChildrenExceptions { get; } = new();
/// <summary>
/// Records the request and either throws a queued exception or returns the configured reply.
/// </summary>
/// <param name="request">The BrowseChildrenRequest to process.</param>
/// <param name="callOptions">Call options specifying RPC behavior.</param>
public Task<BrowseChildrenReply> BrowseChildrenAsync(
BrowseChildrenRequest request,
CallOptions callOptions)
{
BrowseChildrenCalls.Add((request, callOptions));
if (BrowseChildrenExceptions.TryDequeue(out Exception? exception))
{
return Task.FromException<BrowseChildrenReply>(exception);
}
return Task.FromResult(
BrowseChildrenReplies.TryDequeue(out BrowseChildrenReply? reply)
? reply
: BrowseChildrenReply);
}
/// <summary>
/// Gets the list of WatchDeployEvents RPC calls made by the client.
/// </summary>
@@ -0,0 +1,188 @@
using Grpc.Core;
using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy;
namespace ZB.MOM.WW.MxGateway.Client.Tests;
/// <summary>
/// Tests for the <see cref="LazyBrowseNode"/> walker over the BrowseChildren RPC.
/// </summary>
public sealed class LazyBrowseNodeTests
{
/// <summary>
/// Verifies that calling BrowseAsync with no parent returns the root nodes
/// from the first BrowseChildren reply and surfaces the per-child has-children hint.
/// </summary>
[Fact]
public async Task Browse_NoParent_ReturnsRoots()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(1, "Plant", isArea: true), BuildObject(2, "Other")],
childHasChildren: [true, false],
cacheSequence: 1));
await using GalaxyRepositoryClient client = CreateClient(transport);
IReadOnlyList<LazyBrowseNode> roots = await client.BrowseAsync();
Assert.Equal(2, roots.Count);
Assert.Equal("Plant", roots[0].Object.TagName);
Assert.True(roots[0].HasChildrenHint);
Assert.False(roots[0].IsExpanded);
Assert.Equal("Other", roots[1].Object.TagName);
Assert.False(roots[1].HasChildrenHint);
Assert.False(roots[1].IsExpanded);
}
/// <summary>
/// Verifies that ExpandAsync populates Children and marks the node expanded after one RPC.
/// </summary>
[Fact]
public async Task Expand_PopulatesChildrenAndMarksExpanded()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(1, "Plant", isArea: true)],
childHasChildren: [true],
cacheSequence: 1));
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(10, "Line1")],
childHasChildren: [false],
cacheSequence: 1));
await using GalaxyRepositoryClient client = CreateClient(transport);
IReadOnlyList<LazyBrowseNode> roots = await client.BrowseAsync();
await roots[0].ExpandAsync();
Assert.True(roots[0].IsExpanded);
Assert.Single(roots[0].Children);
Assert.Equal("Line1", roots[0].Children[0].Object.TagName);
Assert.Equal(2, transport.BrowseChildrenCalls.Count);
}
/// <summary>
/// Verifies that a second ExpandAsync call is a no-op and issues no additional RPC.
/// </summary>
[Fact]
public async Task Expand_CalledTwice_NoSecondRpc()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(1, "Plant", isArea: true)],
childHasChildren: [true],
cacheSequence: 1));
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(10, "Line1")],
childHasChildren: [false],
cacheSequence: 1));
await using GalaxyRepositoryClient client = CreateClient(transport);
IReadOnlyList<LazyBrowseNode> roots = await client.BrowseAsync();
await roots[0].ExpandAsync();
await roots[0].ExpandAsync();
Assert.Equal(2, transport.BrowseChildrenCalls.Count);
}
/// <summary>
/// Verifies that an RPC failure (NotFound) during expand is wrapped in MxGatewayException.
/// </summary>
[Fact]
public async Task Expand_UnknownParent_ThrowsMxGatewayException()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(1, "Plant", isArea: true)],
childHasChildren: [true],
cacheSequence: 1));
await using GalaxyRepositoryClient client = CreateClient(transport);
IReadOnlyList<LazyBrowseNode> roots = await client.BrowseAsync();
// Queue the failure for the upcoming ExpandAsync call so it consumes
// the exception on its first RPC rather than the BrowseAsync above.
transport.BrowseChildrenExceptions.Enqueue(
new MxGatewayException(
"Parent not found",
new RpcException(new Status(StatusCode.NotFound, "Parent not found"))));
await Assert.ThrowsAsync<MxGatewayException>(async () => await roots[0].ExpandAsync());
}
/// <summary>
/// Verifies that ExpandAsync drains multi-page sibling replies and forwards the page token.
/// </summary>
[Fact]
public async Task Expand_MultiPageSiblings_GathersAllPages()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
// Roots
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(7, "Plant", isArea: true)],
childHasChildren: [true],
cacheSequence: 1));
// First child page (2 children) with a next token
BrowseChildrenReply childPage1 = BuildReply(
children: [BuildObject(70, "ChildA"), BuildObject(71, "ChildB")],
childHasChildren: [false, false],
cacheSequence: 1);
childPage1.NextPageToken = "7:abc:2";
transport.BrowseChildrenReplies.Enqueue(childPage1);
// Second child page (1 child) with no next token
transport.BrowseChildrenReplies.Enqueue(BuildReply(
children: [BuildObject(72, "ChildC")],
childHasChildren: [false],
cacheSequence: 1));
await using GalaxyRepositoryClient client = CreateClient(transport);
IReadOnlyList<LazyBrowseNode> roots = await client.BrowseAsync();
await roots[0].ExpandAsync();
Assert.Equal(3, roots[0].Children.Count);
Assert.Equal(3, transport.BrowseChildrenCalls.Count);
Assert.Equal("7:abc:2", transport.BrowseChildrenCalls[2].Request.PageToken);
}
/// <summary>
/// Verifies that BrowseChildrenOptions filter fields are forwarded to the BrowseChildren request.
/// </summary>
[Fact]
public async Task Browse_WithFilter_ForwardsToRequest()
{
FakeGalaxyRepositoryTransport transport = CreateTransport();
await using GalaxyRepositoryClient client = CreateClient(transport);
await client.BrowseAsync(new BrowseChildrenOptions
{
TagNameGlob = "Mixer*",
AlarmBearingOnly = true,
});
BrowseChildrenRequest request = Assert.Single(transport.BrowseChildrenCalls).Request;
Assert.Equal("Mixer*", request.TagNameGlob);
Assert.True(request.AlarmBearingOnly);
}
private static GalaxyObject BuildObject(int id, string tag, bool isArea = false)
=> new() { GobjectId = id, TagName = tag, BrowseName = tag, IsArea = isArea };
private static BrowseChildrenReply BuildReply(
IReadOnlyList<GalaxyObject> children,
IReadOnlyList<bool> childHasChildren,
ulong cacheSequence)
{
BrowseChildrenReply reply = new() { TotalChildCount = children.Count, CacheSequence = cacheSequence };
reply.Children.AddRange(children);
reply.ChildHasChildren.AddRange(childHasChildren);
return reply;
}
private static GalaxyRepositoryClient CreateClient(FakeGalaxyRepositoryTransport transport)
=> new(transport.Options, transport);
private static FakeGalaxyRepositoryTransport CreateTransport()
=> new(new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
});
}
@@ -0,0 +1,26 @@
namespace ZB.MOM.WW.MxGateway.Client;
/// <summary>
/// Filters and shape options for <see cref="GalaxyRepositoryClient.BrowseAsync(BrowseChildrenOptions, System.Threading.CancellationToken)"/>.
/// Mirror of <see cref="DiscoverHierarchyOptions"/> for the lazy-browse path.
/// </summary>
public sealed class BrowseChildrenOptions
{
/// <summary>Restrict to children whose Galaxy category is in this set.</summary>
public IReadOnlyList<int> CategoryIds { get; init; } = [];
/// <summary>Restrict to children whose template chain contains any of these tokens.</summary>
public IReadOnlyList<string> TemplateChainContains { get; init; } = [];
/// <summary>Optional glob-style filter on <c>tag_name</c>.</summary>
public string? TagNameGlob { get; init; }
/// <summary>Whether to populate each <c>GalaxyObject.Attributes</c>. Null leaves the server default.</summary>
public bool? IncludeAttributes { get; init; }
/// <summary>Restrict to children that bear at least one alarm attribute.</summary>
public bool AlarmBearingOnly { get; init; }
/// <summary>Restrict to children that have at least one historized attribute.</summary>
public bool HistorizedOnly { get; init; }
}
@@ -19,6 +19,7 @@ namespace ZB.MOM.WW.MxGateway.Client;
public sealed class GalaxyRepositoryClient : IAsyncDisposable
{
private const int DiscoverHierarchyPageSize = 5000;
private const int BrowseChildrenPageSize = 500;
private readonly GrpcChannel? _channel;
private readonly IGalaxyRepositoryClientTransport _transport;
@@ -278,6 +279,89 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
cancellationToken);
}
/// <summary>Returns root-level browse nodes (objects with no parent).</summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The list of root <see cref="LazyBrowseNode"/> instances.</returns>
public Task<IReadOnlyList<LazyBrowseNode>> BrowseAsync(CancellationToken cancellationToken = default)
=> BrowseAsync(null, cancellationToken);
/// <summary>Returns root-level browse nodes filtered by the given options.</summary>
/// <param name="options">Browse filter options. Null applies no filter.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The list of root <see cref="LazyBrowseNode"/> instances.</returns>
public async Task<IReadOnlyList<LazyBrowseNode>> BrowseAsync(
BrowseChildrenOptions? options,
CancellationToken cancellationToken = default)
{
BrowseChildrenOptions effective = options ?? new BrowseChildrenOptions();
List<LazyBrowseNode> roots = [];
string pageToken = string.Empty;
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
do
{
BrowseChildrenRequest request = BuildBrowseChildrenRequest(effective);
request.PageToken = pageToken;
BrowseChildrenReply reply = await BrowseChildrenRawAsync(request, cancellationToken).ConfigureAwait(false);
for (int i = 0; i < reply.Children.Count; i++)
{
bool hint = i < reply.ChildHasChildren.Count && reply.ChildHasChildren[i];
roots.Add(new LazyBrowseNode(this, reply.Children[i], hint, effective));
}
pageToken = reply.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken) && !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy BrowseChildren returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
return roots;
}
/// <summary>Issues a raw BrowseChildren RPC without result wrapping.</summary>
/// <param name="request">The browse-children request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The raw server reply.</returns>
public Task<BrowseChildrenReply> BrowseChildrenRawAsync(
BrowseChildrenRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return ExecuteSafeUnaryAsync(
token => _transport.BrowseChildrenAsync(request, CreateCallOptions(token)),
cancellationToken);
}
internal static BrowseChildrenRequest BuildBrowseChildrenRequest(BrowseChildrenOptions options)
{
ArgumentNullException.ThrowIfNull(options);
BrowseChildrenRequest request = new()
{
PageSize = BrowseChildrenPageSize,
AlarmBearingOnly = options.AlarmBearingOnly,
HistorizedOnly = options.HistorizedOnly,
};
request.CategoryIds.Add(options.CategoryIds);
request.TemplateChainContains.Add(options.TemplateChainContains);
if (!string.IsNullOrWhiteSpace(options.TagNameGlob))
{
request.TagNameGlob = options.TagNameGlob;
}
if (options.IncludeAttributes.HasValue)
{
request.IncludeAttributes = options.IncludeAttributes.Value;
}
return request;
}
/// <summary>
/// Subscribes to Galaxy deploy events. The server emits a bootstrap event with the
/// current state on subscribe so callers can prime their cache, then emits one event
@@ -74,6 +74,23 @@ internal sealed class GrpcGalaxyRepositoryClientTransport(
}
}
/// <inheritdoc />
public async Task<BrowseChildrenReply> BrowseChildrenAsync(
BrowseChildrenRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.BrowseChildrenAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw MapRpcException(exception, callOptions.CancellationToken);
}
}
/// <inheritdoc />
public async IAsyncEnumerable<DeployEvent> WatchDeployEventsAsync(
WatchDeployEventsRequest request,
@@ -33,6 +33,13 @@ internal interface IGalaxyRepositoryClientTransport
DiscoverHierarchyRequest request,
CallOptions callOptions);
/// <summary>Returns direct children of a parent in the Galaxy hierarchy.</summary>
/// <param name="request">The browse children request.</param>
/// <param name="callOptions">gRPC call options (timeout, cancellation, etc.).</param>
Task<BrowseChildrenReply> BrowseChildrenAsync(
BrowseChildrenRequest request,
CallOptions callOptions);
/// <summary>Watches for deployment events from the Galaxy Repository server.</summary>
/// <param name="request">The watch deploy events request.</param>
/// <param name="callOptions">gRPC call options (timeout, cancellation, etc.).</param>
@@ -0,0 +1,83 @@
using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy;
namespace ZB.MOM.WW.MxGateway.Client;
/// <summary>
/// One node in a lazy-loaded Galaxy browse tree. Holds the underlying
/// <see cref="GalaxyObject"/> and exposes <see cref="ExpandAsync"/> to fetch
/// its direct children on demand. Expansion is one-shot: a second call is a
/// no-op. Pagination of large sibling sets is handled internally.
/// </summary>
public sealed class LazyBrowseNode
{
private readonly GalaxyRepositoryClient _client;
private readonly BrowseChildrenOptions _options;
private readonly List<LazyBrowseNode> _children = [];
private bool _isExpanded;
internal LazyBrowseNode(
GalaxyRepositoryClient client,
GalaxyObject @object,
bool hasChildrenHint,
BrowseChildrenOptions options)
{
_client = client;
Object = @object;
HasChildrenHint = hasChildrenHint;
_options = options;
}
/// <summary>The underlying Galaxy object for this node.</summary>
public GalaxyObject Object { get; }
/// <summary>True when the server reports this node has at least one matching descendant.</summary>
public bool HasChildrenHint { get; }
/// <summary>Direct children loaded by <see cref="ExpandAsync"/>; empty until then.</summary>
public IReadOnlyList<LazyBrowseNode> Children => _children;
/// <summary>True after the first <see cref="ExpandAsync"/> call completes.</summary>
public bool IsExpanded => _isExpanded;
/// <summary>
/// Fetches direct children from the gateway and populates <see cref="Children"/>.
/// Idempotent: subsequent calls are no-ops.
/// </summary>
/// <param name="cancellationToken">Token to observe for cancellation.</param>
public async Task ExpandAsync(CancellationToken cancellationToken = default)
{
if (_isExpanded)
{
return;
}
string pageToken = string.Empty;
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
do
{
BrowseChildrenRequest request = GalaxyRepositoryClient.BuildBrowseChildrenRequest(_options);
request.ParentGobjectId = Object.GobjectId;
request.PageToken = pageToken;
BrowseChildrenReply reply = await _client
.BrowseChildrenRawAsync(request, cancellationToken)
.ConfigureAwait(false);
for (int i = 0; i < reply.Children.Count; i++)
{
bool hint = i < reply.ChildHasChildren.Count && reply.ChildHasChildren[i];
_children.Add(new LazyBrowseNode(_client, reply.Children[i], hint, _options));
}
pageToken = reply.NextPageToken;
if (!string.IsNullOrWhiteSpace(pageToken) && !seenPageTokens.Add(pageToken))
{
throw new MxGatewayException(
$"Galaxy BrowseChildren returned a repeated page token '{pageToken}'.");
}
}
while (!string.IsNullOrWhiteSpace(pageToken));
_isExpanded = true;
}
}
+143
View File
@@ -3,7 +3,9 @@ package mxgateway
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
@@ -13,6 +15,9 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
// browseChildrenPageSize is the per-request page size used by the lazy walker.
const browseChildrenPageSize = 500
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
// Galaxy Repository service exposed for callers that need direct contract
// access.
@@ -40,6 +45,10 @@ type (
WatchDeployEventsRequest = pb.WatchDeployEventsRequest
// DeployEvent is one Galaxy Repository deploy event.
DeployEvent = pb.DeployEvent
// BrowseChildrenRequest is the request for BrowseChildren.
BrowseChildrenRequest = pb.BrowseChildrenRequest
// BrowseChildrenReply is the reply for BrowseChildren.
BrowseChildrenReply = pb.BrowseChildrenReply
)
// RawDeployEventStream is the generated WatchDeployEvents client stream.
@@ -238,6 +247,140 @@ func (c *GalaxyClient) Close() error {
return c.conn.Close()
}
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
// The node is safe for concurrent use; concurrent Expand calls collapse to a
// single RPC.
type LazyBrowseNode struct {
client *GalaxyClient
object *pb.GalaxyObject
hasChildrenHint bool
options BrowseChildrenOptions
mu sync.Mutex
children []*LazyBrowseNode
isExpanded bool
}
// Object returns the underlying GalaxyObject describing this node.
func (n *LazyBrowseNode) Object() *pb.GalaxyObject { return n.object }
// HasChildrenHint reports the server-supplied hint on whether this node has
// matching descendants under the current filter set.
func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint }
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
// an empty slice when Expand has not yet been called.
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
n.mu.Lock()
defer n.mu.Unlock()
out := make([]*LazyBrowseNode, len(n.children))
copy(out, n.children)
return out
}
// IsExpanded reports whether Expand has completed successfully on this node.
func (n *LazyBrowseNode) IsExpanded() bool {
n.mu.Lock()
defer n.mu.Unlock()
return n.isExpanded
}
// Expand fetches this node's direct children via BrowseChildren when they have
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
// and do not issue another RPC.
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
n.mu.Lock()
defer n.mu.Unlock()
if n.isExpanded {
return nil
}
parentID := n.object.GetGobjectId()
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
if err != nil {
return err
}
n.children = children
n.isExpanded = true
return nil
}
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
// have only their server-supplied hints populated; call Expand on each node to
// fetch its direct children. When opts is nil the server defaults apply.
func (c *GalaxyClient) Browse(ctx context.Context, opts *BrowseChildrenOptions) ([]*LazyBrowseNode, error) {
effective := BrowseChildrenOptions{}
if opts != nil {
effective = *opts
}
return c.browseChildrenInner(ctx, nil, effective)
}
// BrowseChildrenRaw issues a single BrowseChildren RPC and returns the raw
// reply for callers that need direct page-token control. Transport-level
// failures are wrapped in *GatewayError to match the rest of the client.
func (c *GalaxyClient) BrowseChildrenRaw(ctx context.Context, req *pb.BrowseChildrenRequest) (*pb.BrowseChildrenReply, error) {
callCtx, cancel := c.callContext(ctx)
defer cancel()
reply, err := c.raw.BrowseChildren(callCtx, req)
if err != nil {
return nil, &GatewayError{Op: "galaxy browse children", Err: err}
}
return reply, nil
}
func (c *GalaxyClient) browseChildrenInner(
ctx context.Context,
parentGobjectID *int32,
opts BrowseChildrenOptions,
) ([]*LazyBrowseNode, error) {
var nodes []*LazyBrowseNode
pageToken := ""
seen := map[string]struct{}{}
for {
req := &pb.BrowseChildrenRequest{
PageSize: browseChildrenPageSize,
PageToken: pageToken,
CategoryIds: opts.CategoryIds,
TemplateChainContains: opts.TemplateChainContains,
TagNameGlob: opts.TagNameGlob,
AlarmBearingOnly: opts.AlarmBearingOnly,
HistorizedOnly: opts.HistorizedOnly,
}
if parentGobjectID != nil {
req.Parent = &pb.BrowseChildrenRequest_ParentGobjectId{ParentGobjectId: *parentGobjectID}
}
if opts.IncludeAttributes != nil {
req.IncludeAttributes = opts.IncludeAttributes
}
reply, err := c.BrowseChildrenRaw(ctx, req)
if err != nil {
return nil, err
}
for i, child := range reply.GetChildren() {
hasChildren := reply.GetChildHasChildren()
hint := i < len(hasChildren) && hasChildren[i]
nodes = append(nodes, &LazyBrowseNode{
client: c,
object: child,
hasChildrenHint: hint,
options: opts,
})
}
pageToken = reply.GetNextPageToken()
if pageToken == "" {
return nodes, nil
}
if _, dup := seen[pageToken]; dup {
return nil, fmt.Errorf("mxgateway: galaxy browse children returned repeated page token %q", pageToken)
}
seen[pageToken] = struct{}{}
}
}
func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
timeout := c.opts.CallTimeout
if timeout == 0 {
+322 -9
View File
@@ -9,6 +9,8 @@ import (
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -370,15 +372,18 @@ func newGalaxyBufconnClient(t *testing.T, fake *fakeGalaxyServer) (*GalaxyClient
type fakeGalaxyServer struct {
pb.UnimplementedGalaxyRepositoryServer
testReply *pb.TestConnectionReply
testAuth string
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchSendInterval time.Duration
watchHoldOpen bool
testReply *pb.TestConnectionReply
testAuth string
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchSendInterval time.Duration
watchHoldOpen bool
browseChildrenCalls []*pb.BrowseChildrenRequest
browseChildrenReplies []*pb.BrowseChildrenReply
browseChildrenError error
}
func (s *fakeGalaxyServer) TestConnection(ctx context.Context, req *pb.TestConnectionRequest) (*pb.TestConnectionReply, error) {
@@ -425,3 +430,311 @@ func (s *fakeGalaxyServer) WatchDeployEvents(req *pb.WatchDeployEventsRequest, s
}
return nil
}
func (s *fakeGalaxyServer) BrowseChildren(ctx context.Context, req *pb.BrowseChildrenRequest) (*pb.BrowseChildrenReply, error) {
s.browseChildrenCalls = append(s.browseChildrenCalls, req)
if s.browseChildrenError != nil {
err := s.browseChildrenError
s.browseChildrenError = nil
return nil, err
}
if len(s.browseChildrenReplies) == 0 {
return &pb.BrowseChildrenReply{}, nil
}
reply := s.browseChildrenReplies[0]
s.browseChildrenReplies = s.browseChildrenReplies[1:]
return reply, nil
}
func obj(id int32, tag string, isArea bool) *pb.GalaxyObject {
return &pb.GalaxyObject{
GobjectId: id,
TagName: tag,
BrowseName: tag,
IsArea: isArea,
}
}
func buildBrowseReply(children []*pb.GalaxyObject, hasChildren []bool, seq uint64) *pb.BrowseChildrenReply {
return &pb.BrowseChildrenReply{
TotalChildCount: int32(len(children)),
CacheSequence: seq,
Children: children,
ChildHasChildren: hasChildren,
}
}
func TestGalaxyBrowseNoParentReturnsRoots(t *testing.T) {
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{
buildBrowseReply(
[]*pb.GalaxyObject{obj(1, "Plant", true), obj(99, "Other", false)},
[]bool{true, false},
7,
),
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
roots, err := client.Browse(context.Background(), nil)
if err != nil {
t.Fatalf("Browse: %v", err)
}
if got, want := len(roots), 2; got != want {
t.Fatalf("len(roots) = %d, want %d", got, want)
}
if roots[0].Object().GetTagName() != "Plant" {
t.Fatalf("roots[0].TagName = %q", roots[0].Object().GetTagName())
}
if !roots[0].HasChildrenHint() {
t.Fatal("roots[0].HasChildrenHint = false, want true")
}
if roots[0].IsExpanded() {
t.Fatal("roots[0].IsExpanded = true, want false")
}
if roots[1].HasChildrenHint() {
t.Fatal("roots[1].HasChildrenHint = true, want false")
}
if len(fake.browseChildrenCalls) != 1 {
t.Fatalf("BrowseChildren calls = %d, want 1", len(fake.browseChildrenCalls))
}
if fake.browseChildrenCalls[0].GetParent() != nil {
t.Fatalf("root browse should not set Parent oneof, got %T", fake.browseChildrenCalls[0].GetParent())
}
}
func TestGalaxyBrowseExpandPopulatesChildrenAndMarksExpanded(t *testing.T) {
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{
buildBrowseReply(
[]*pb.GalaxyObject{obj(1, "Plant", true)},
[]bool{true},
1,
),
buildBrowseReply(
[]*pb.GalaxyObject{obj(10, "Area1", true), obj(11, "Tank1", false)},
[]bool{true, false},
1,
),
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
roots, err := client.Browse(context.Background(), nil)
if err != nil {
t.Fatalf("Browse: %v", err)
}
if len(roots) != 1 {
t.Fatalf("len(roots) = %d, want 1", len(roots))
}
plant := roots[0]
if plant.IsExpanded() {
t.Fatal("plant.IsExpanded = true before Expand, want false")
}
if err := plant.Expand(context.Background()); err != nil {
t.Fatalf("Expand: %v", err)
}
if !plant.IsExpanded() {
t.Fatal("plant.IsExpanded = false after Expand, want true")
}
children := plant.Children()
if len(children) != 2 {
t.Fatalf("len(children) = %d, want 2", len(children))
}
if children[0].Object().GetTagName() != "Area1" {
t.Fatalf("children[0].TagName = %q, want Area1", children[0].Object().GetTagName())
}
if !children[0].HasChildrenHint() {
t.Fatal("children[0].HasChildrenHint = false, want true")
}
if children[1].HasChildrenHint() {
t.Fatal("children[1].HasChildrenHint = true, want false")
}
if len(fake.browseChildrenCalls) != 2 {
t.Fatalf("BrowseChildren calls = %d, want 2", len(fake.browseChildrenCalls))
}
parent := fake.browseChildrenCalls[1].GetParent()
parentGobj, ok := parent.(*pb.BrowseChildrenRequest_ParentGobjectId)
if !ok {
t.Fatalf("Parent oneof = %T, want *BrowseChildrenRequest_ParentGobjectId", parent)
}
if parentGobj.ParentGobjectId != 1 {
t.Fatalf("ParentGobjectId = %d, want 1", parentGobj.ParentGobjectId)
}
}
func TestGalaxyBrowseExpandIdempotentNoSecondRpc(t *testing.T) {
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{
buildBrowseReply(
[]*pb.GalaxyObject{obj(1, "Plant", true)},
[]bool{true},
1,
),
buildBrowseReply(
[]*pb.GalaxyObject{obj(10, "Area1", true)},
[]bool{false},
1,
),
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
roots, err := client.Browse(context.Background(), nil)
if err != nil {
t.Fatalf("Browse: %v", err)
}
plant := roots[0]
if err := plant.Expand(context.Background()); err != nil {
t.Fatalf("Expand #1: %v", err)
}
callsAfterFirst := len(fake.browseChildrenCalls)
if callsAfterFirst != 2 {
t.Fatalf("BrowseChildren calls after first Expand = %d, want 2", callsAfterFirst)
}
if err := plant.Expand(context.Background()); err != nil {
t.Fatalf("Expand #2: %v", err)
}
if got := len(fake.browseChildrenCalls); got != callsAfterFirst {
t.Fatalf("BrowseChildren calls after second Expand = %d, want %d (no extra RPC)", got, callsAfterFirst)
}
}
func TestGalaxyBrowseExpandUnknownParentReturnsNotFoundError(t *testing.T) {
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{
buildBrowseReply(
[]*pb.GalaxyObject{obj(1, "Plant", true)},
[]bool{true},
1,
),
},
browseChildrenError: status.Error(codes.NotFound, "parent not found"),
}
// The first Browse() consumes the first reply; the next call (Expand) will
// then hit browseChildrenError. We need the error to fire only on the second
// call, so seed the reply first and let the call sequence consume them in
// order. Because BrowseChildren in the fake consumes browseChildrenError
// before falling through to replies, swap the strategy: keep the root reply
// but have BrowseChildren return the error on the second call. We do this by
// emptying the reply list after the first Browse.
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
// First call returns the error (because browseChildrenError takes precedence).
// To avoid that, clear it for the root call by performing a manual setup: we
// pre-stage replies first, then set the error after the first call. Easiest:
// pre-Browse() with error=nil, then set error before Expand.
fake.browseChildrenError = nil
roots, err := client.Browse(context.Background(), nil)
if err != nil {
t.Fatalf("Browse: %v", err)
}
if len(roots) != 1 {
t.Fatalf("len(roots) = %d, want 1", len(roots))
}
fake.browseChildrenError = status.Error(codes.NotFound, "parent not found")
err = roots[0].Expand(context.Background())
if err == nil {
t.Fatal("Expand: error = nil, want NotFound")
}
if status.Code(err) != codes.NotFound {
t.Fatalf("status.Code = %s, want NotFound", status.Code(err))
}
if roots[0].IsExpanded() {
t.Fatal("roots[0].IsExpanded = true after failed Expand, want false")
}
}
func TestGalaxyBrowseExpandMultiPageGathersAllPages(t *testing.T) {
firstPage := buildBrowseReply(
[]*pb.GalaxyObject{obj(1, "Plant", true)},
[]bool{true},
7,
)
pageA := buildBrowseReply(
[]*pb.GalaxyObject{obj(10, "Child1", false), obj(11, "Child2", false)},
[]bool{false, false},
7,
)
pageA.NextPageToken = "7:abc:2"
pageB := buildBrowseReply(
[]*pb.GalaxyObject{obj(12, "Child3", false)},
[]bool{false},
7,
)
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{firstPage, pageA, pageB},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
roots, err := client.Browse(context.Background(), nil)
if err != nil {
t.Fatalf("Browse: %v", err)
}
if err := roots[0].Expand(context.Background()); err != nil {
t.Fatalf("Expand: %v", err)
}
children := roots[0].Children()
if len(children) != 3 {
t.Fatalf("len(children) = %d, want 3", len(children))
}
if len(fake.browseChildrenCalls) != 3 {
t.Fatalf("BrowseChildren calls = %d, want 3", len(fake.browseChildrenCalls))
}
if got := fake.browseChildrenCalls[2].GetPageToken(); got != "7:abc:2" {
t.Fatalf("third call PageToken = %q, want %q", got, "7:abc:2")
}
}
func TestGalaxyBrowseWithFilterForwardsToRequest(t *testing.T) {
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{
buildBrowseReply(nil, nil, 1),
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
include := true
opts := &BrowseChildrenOptions{
CategoryIds: []int32{7, 9},
TemplateChainContains: []string{"$AppObject"},
TagNameGlob: "Tank*",
IncludeAttributes: &include,
AlarmBearingOnly: true,
HistorizedOnly: true,
}
if _, err := client.Browse(context.Background(), opts); err != nil {
t.Fatalf("Browse: %v", err)
}
if len(fake.browseChildrenCalls) != 1 {
t.Fatalf("BrowseChildren calls = %d, want 1", len(fake.browseChildrenCalls))
}
got := fake.browseChildrenCalls[0]
if want := []int32{7, 9}; len(got.GetCategoryIds()) != 2 || got.GetCategoryIds()[0] != want[0] || got.GetCategoryIds()[1] != want[1] {
t.Fatalf("CategoryIds = %v, want %v", got.GetCategoryIds(), want)
}
if want := []string{"$AppObject"}; len(got.GetTemplateChainContains()) != 1 || got.GetTemplateChainContains()[0] != want[0] {
t.Fatalf("TemplateChainContains = %v, want %v", got.GetTemplateChainContains(), want)
}
if got.GetTagNameGlob() != "Tank*" {
t.Fatalf("TagNameGlob = %q, want %q", got.GetTagNameGlob(), "Tank*")
}
if !got.GetIncludeAttributes() {
t.Fatal("IncludeAttributes = false, want true")
}
if !got.GetAlarmBearingOnly() {
t.Fatal("AlarmBearingOnly = false, want true")
}
if !got.GetHistorizedOnly() {
t.Fatal("HistorizedOnly = false, want true")
}
}
+22
View File
@@ -36,6 +36,28 @@ type Options struct {
DialOptions []grpc.DialOption
}
// BrowseChildrenOptions configures lazy Galaxy hierarchy walks performed by
// (*GalaxyClient).Browse and (*LazyBrowseNode).Expand. All fields are optional;
// the zero value matches the dashboard default (no filters, all attributes per
// the server default).
type BrowseChildrenOptions struct {
// CategoryIds restricts results to the listed Galaxy category ids when set.
CategoryIds []int32
// TemplateChainContains restricts results to objects whose template chain
// contains any of the listed template tag names.
TemplateChainContains []string
// TagNameGlob restricts results to objects whose tag name matches the glob
// pattern when non-empty.
TagNameGlob string
// IncludeAttributes overrides the server default for attribute inclusion when
// non-nil. The pointer form mirrors the proto's optional field.
IncludeAttributes *bool
// AlarmBearingOnly limits results to alarm-bearing objects when true.
AlarmBearingOnly bool
// HistorizedOnly limits results to historized objects when true.
HistorizedOnly bool
}
// RedactedAPIKey returns a display-safe representation of the configured API
// key for diagnostics and CLI output.
func (o Options) RedactedAPIKey() string {
@@ -142,6 +142,37 @@ public final class GalaxyRepositoryGrpc {
return getWatchDeployEventsMethod;
}
private static volatile io.grpc.MethodDescriptor<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest,
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply> getBrowseChildrenMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "BrowseChildren",
requestType = galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest.class,
responseType = galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest,
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply> getBrowseChildrenMethod() {
io.grpc.MethodDescriptor<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest, galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply> getBrowseChildrenMethod;
if ((getBrowseChildrenMethod = GalaxyRepositoryGrpc.getBrowseChildrenMethod) == null) {
synchronized (GalaxyRepositoryGrpc.class) {
if ((getBrowseChildrenMethod = GalaxyRepositoryGrpc.getBrowseChildrenMethod) == null) {
GalaxyRepositoryGrpc.getBrowseChildrenMethod = getBrowseChildrenMethod =
io.grpc.MethodDescriptor.<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest, galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "BrowseChildren"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply.getDefaultInstance()))
.setSchemaDescriptor(new GalaxyRepositoryMethodDescriptorSupplier("BrowseChildren"))
.build();
}
}
}
return getBrowseChildrenMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
@@ -246,6 +277,19 @@ public final class GalaxyRepositoryGrpc {
io.grpc.stub.StreamObserver<galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getWatchDeployEventsMethod(), responseObserver);
}
/**
* <pre>
* Returns the direct children of a parent object (or the root objects when
* `parent` is unset). Designed for OPC UA-style lazy expand: clients walk
* one level at a time instead of paging the full hierarchy. Filters mirror
* DiscoverHierarchy exactly. Backed by the same shared hierarchy cache.
* </pre>
*/
default void browseChildren(galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest request,
io.grpc.stub.StreamObserver<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getBrowseChildrenMethod(), responseObserver);
}
}
/**
@@ -326,6 +370,20 @@ public final class GalaxyRepositoryGrpc {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getWatchDeployEventsMethod(), getCallOptions()), request, responseObserver);
}
/**
* <pre>
* Returns the direct children of a parent object (or the root objects when
* `parent` is unset). Designed for OPC UA-style lazy expand: clients walk
* one level at a time instead of paging the full hierarchy. Filters mirror
* DiscoverHierarchy exactly. Backed by the same shared hierarchy cache.
* </pre>
*/
public void browseChildren(galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest request,
io.grpc.stub.StreamObserver<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getBrowseChildrenMethod(), getCallOptions()), request, responseObserver);
}
}
/**
@@ -387,6 +445,19 @@ public final class GalaxyRepositoryGrpc {
return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall(
getChannel(), getWatchDeployEventsMethod(), getCallOptions(), request);
}
/**
* <pre>
* Returns the direct children of a parent object (or the root objects when
* `parent` is unset). Designed for OPC UA-style lazy expand: clients walk
* one level at a time instead of paging the full hierarchy. Filters mirror
* DiscoverHierarchy exactly. Backed by the same shared hierarchy cache.
* </pre>
*/
public galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply browseChildren(galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getBrowseChildrenMethod(), getCallOptions(), request);
}
}
/**
@@ -447,6 +518,19 @@ public final class GalaxyRepositoryGrpc {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getWatchDeployEventsMethod(), getCallOptions(), request);
}
/**
* <pre>
* Returns the direct children of a parent object (or the root objects when
* `parent` is unset). Designed for OPC UA-style lazy expand: clients walk
* one level at a time instead of paging the full hierarchy. Filters mirror
* DiscoverHierarchy exactly. Backed by the same shared hierarchy cache.
* </pre>
*/
public galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply browseChildren(galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getBrowseChildrenMethod(), getCallOptions(), request);
}
}
/**
@@ -494,12 +578,27 @@ public final class GalaxyRepositoryGrpc {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getDiscoverHierarchyMethod(), getCallOptions()), request);
}
/**
* <pre>
* Returns the direct children of a parent object (or the root objects when
* `parent` is unset). Designed for OPC UA-style lazy expand: clients walk
* one level at a time instead of paging the full hierarchy. Filters mirror
* DiscoverHierarchy exactly. Backed by the same shared hierarchy cache.
* </pre>
*/
public com.google.common.util.concurrent.ListenableFuture<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply> browseChildren(
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getBrowseChildrenMethod(), getCallOptions()), request);
}
}
private static final int METHODID_TEST_CONNECTION = 0;
private static final int METHODID_GET_LAST_DEPLOY_TIME = 1;
private static final int METHODID_DISCOVER_HIERARCHY = 2;
private static final int METHODID_WATCH_DEPLOY_EVENTS = 3;
private static final int METHODID_BROWSE_CHILDREN = 4;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
@@ -534,6 +633,10 @@ public final class GalaxyRepositoryGrpc {
serviceImpl.watchDeployEvents((galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest) request,
(io.grpc.stub.StreamObserver<galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent>) responseObserver);
break;
case METHODID_BROWSE_CHILDREN:
serviceImpl.browseChildren((galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest) request,
(io.grpc.stub.StreamObserver<galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply>) responseObserver);
break;
default:
throw new AssertionError();
}
@@ -580,6 +683,13 @@ public final class GalaxyRepositoryGrpc {
galaxy_repository.v1.GalaxyRepositoryOuterClass.WatchDeployEventsRequest,
galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent>(
service, METHODID_WATCH_DEPLOY_EVENTS)))
.addMethod(
getBrowseChildrenMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest,
galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply>(
service, METHODID_BROWSE_CHILDREN)))
.build();
}
@@ -632,6 +742,7 @@ public final class GalaxyRepositoryGrpc {
.addMethod(getGetLastDeployTimeMethod())
.addMethod(getDiscoverHierarchyMethod())
.addMethod(getWatchDeployEventsMethod())
.addMethod(getBrowseChildrenMethod())
.build();
}
}
@@ -0,0 +1,105 @@
package com.zb.mom.ww.mxgateway.client;
import java.util.Collections;
import java.util.List;
/**
* Filters and shape options for {@link GalaxyRepositoryClient#browse(BrowseChildrenOptions)}.
* Mirror of the existing DiscoverHierarchy options for the lazy-browse path.
*
* <p>All filter fields are AND-combined server-side. Empty / unset fields disable
* that filter. The {@code includeAttributes} tri-state uses {@code null} to mean
* "let the server use its default"; non-{@code null} forwards the explicit flag.
*/
public final class BrowseChildrenOptions {
private final List<Integer> categoryIds;
private final List<String> templateChainContains;
private final String tagNameGlob;
private final Boolean includeAttributes;
private final boolean alarmBearingOnly;
private final boolean historizedOnly;
private BrowseChildrenOptions(Builder b) {
this.categoryIds = List.copyOf(b.categoryIds);
this.templateChainContains = List.copyOf(b.templateChainContains);
this.tagNameGlob = b.tagNameGlob;
this.includeAttributes = b.includeAttributes;
this.alarmBearingOnly = b.alarmBearingOnly;
this.historizedOnly = b.historizedOnly;
}
/** @return immutable list of category IDs to include; empty disables this filter. */
public List<Integer> getCategoryIds() { return categoryIds; }
/** @return immutable list of template names that must appear in each child's template chain. */
public List<String> getTemplateChainContains() { return templateChainContains; }
/** @return SQL-LIKE-style glob applied to {@code tag_name}; empty disables. */
public String getTagNameGlob() { return tagNameGlob; }
/** @return tri-state override for {@code include_attributes}; {@code null} keeps the server default. */
public Boolean getIncludeAttributes() { return includeAttributes; }
/** @return restrict to alarm-bearing objects. */
public boolean isAlarmBearingOnly() { return alarmBearingOnly; }
/** @return restrict to objects with at least one historized attribute. */
public boolean isHistorizedOnly() { return historizedOnly; }
/** @return a fresh builder. */
public static Builder builder() { return new Builder(); }
/** @return options with every filter disabled and {@code includeAttributes} unset. */
public static BrowseChildrenOptions empty() { return builder().build(); }
/** Fluent builder for {@link BrowseChildrenOptions}. */
public static final class Builder {
private List<Integer> categoryIds = Collections.emptyList();
private List<String> templateChainContains = Collections.emptyList();
private String tagNameGlob = "";
private Boolean includeAttributes = null;
private boolean alarmBearingOnly = false;
private boolean historizedOnly = false;
/** Sets the category-id filter. */
public Builder categoryIds(List<Integer> v) {
this.categoryIds = v == null ? Collections.emptyList() : v;
return this;
}
/** Sets the template-chain-contains filter. */
public Builder templateChainContains(List<String> v) {
this.templateChainContains = v == null ? Collections.emptyList() : v;
return this;
}
/** Sets the tag-name glob. */
public Builder tagNameGlob(String v) {
this.tagNameGlob = v == null ? "" : v;
return this;
}
/** Sets the tri-state {@code includeAttributes} override; {@code null} keeps the server default. */
public Builder includeAttributes(Boolean v) {
this.includeAttributes = v;
return this;
}
/** Toggles the alarm-bearing-only filter. */
public Builder alarmBearingOnly(boolean v) {
this.alarmBearingOnly = v;
return this;
}
/** Toggles the historized-only filter. */
public Builder historizedOnly(boolean v) {
this.historizedOnly = v;
return this;
}
/** Builds the immutable options. */
public BrowseChildrenOptions build() {
return new BrowseChildrenOptions(this);
}
}
}
@@ -4,6 +4,8 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import galaxy_repository.v1.GalaxyRepositoryGrpc;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyReply;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyRequest;
@@ -37,6 +39,7 @@ import javax.net.ssl.SSLException;
*/
public final class GalaxyRepositoryClient implements AutoCloseable {
private static final int DISCOVER_HIERARCHY_PAGE_SIZE = 5000;
private static final int BROWSE_CHILDREN_PAGE_SIZE = 500;
private final ManagedChannel ownedChannel;
private final MxGatewayClientOptions options;
@@ -213,6 +216,98 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
return discoverHierarchyPageAsync("", new java.util.ArrayList<>(), new java.util.HashSet<>());
}
/**
* Lazy-browse entry point: fetches the root layer of the Galaxy hierarchy.
* Each returned {@link LazyBrowseNode} can be expanded on demand via
* {@link LazyBrowseNode#expand()} to load its direct children.
*
* @return the root nodes (no parent selector) with default options
* @throws MxGatewayException on transport or protocol failure
*/
public List<LazyBrowseNode> browse() {
return browse(null);
}
/**
* Lazy-browse entry point with caller-supplied filters / shape.
*
* @param options filter and shape options; {@code null} means {@link BrowseChildrenOptions#empty()}
* @return the root nodes matching the options
* @throws MxGatewayException on transport or protocol failure
*/
public List<LazyBrowseNode> browse(BrowseChildrenOptions options) {
BrowseChildrenOptions effective = options == null ? BrowseChildrenOptions.empty() : options;
return browseChildrenInner(null, effective);
}
/**
* Issues a single {@code BrowseChildren} RPC and returns the raw reply.
* Callers wanting full control over pagination can drive the loop themselves.
*
* @param request the request to send
* @return the reply
* @throws MxGatewayException on transport or protocol failure
*/
public BrowseChildrenReply browseChildrenRaw(BrowseChildrenRequest request) {
try {
return rawBlockingStub().browseChildren(request);
} catch (RuntimeException error) {
if (error instanceof MxGatewayException) {
throw error;
}
throw MxGatewayErrors.fromGrpc("galaxy browse children", error);
}
}
/**
* Drives the BrowseChildren paging loop for a single parent (or roots when
* {@code parentGobjectId} is {@code null}). Detects repeated page tokens to
* avoid infinite loops on a buggy server.
*/
List<LazyBrowseNode> browseChildrenInner(Integer parentGobjectId, BrowseChildrenOptions options) {
java.util.ArrayList<LazyBrowseNode> nodes = new java.util.ArrayList<>();
java.util.HashSet<String> seenPageTokens = new java.util.HashSet<>();
String pageToken = "";
while (true) {
BrowseChildrenRequest.Builder builder = BrowseChildrenRequest.newBuilder()
.setPageSize(BROWSE_CHILDREN_PAGE_SIZE)
.setPageToken(pageToken)
.setAlarmBearingOnly(options.isAlarmBearingOnly())
.setHistorizedOnly(options.isHistorizedOnly());
if (parentGobjectId != null) {
builder.setParentGobjectId(parentGobjectId.intValue());
}
if (!options.getCategoryIds().isEmpty()) {
builder.addAllCategoryIds(options.getCategoryIds());
}
if (!options.getTemplateChainContains().isEmpty()) {
builder.addAllTemplateChainContains(options.getTemplateChainContains());
}
if (!options.getTagNameGlob().isEmpty()) {
builder.setTagNameGlob(options.getTagNameGlob());
}
if (options.getIncludeAttributes() != null) {
builder.setIncludeAttributes(options.getIncludeAttributes());
}
BrowseChildrenReply reply = browseChildrenRaw(builder.build());
for (int i = 0; i < reply.getChildrenCount(); i++) {
boolean hint = i < reply.getChildHasChildrenCount() && reply.getChildHasChildren(i);
nodes.add(new LazyBrowseNode(this, reply.getChildren(i), hint, options));
}
pageToken = reply.getNextPageToken();
if (pageToken == null || pageToken.isEmpty()) {
return nodes;
}
if (!seenPageTokens.add(pageToken)) {
throw new MxGatewayException(
"galaxy browse children returned repeated page token: " + pageToken);
}
}
}
/**
* Subscribes to {@code WatchDeployEvents} via the async stub and consumes
* results through a blocking iterator. Closing the returned stream cancels
@@ -0,0 +1,75 @@
package com.zb.mom.ww.mxgateway.client;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
import java.util.Collections;
import java.util.List;
/**
* One node in a lazy-loaded Galaxy browse tree. Holds the underlying
* {@link GalaxyObject} and exposes {@link #expand()} to fetch its direct
* children on demand. Expansion is one-shot: a second call is a no-op.
* Pagination of large sibling sets is handled internally by the client.
*/
public final class LazyBrowseNode {
private final GalaxyRepositoryClient client;
private final GalaxyObject object;
private final boolean hasChildrenHint;
private final BrowseChildrenOptions options;
private final Object lock = new Object();
private List<LazyBrowseNode> children = Collections.emptyList();
private boolean isExpanded;
LazyBrowseNode(
GalaxyRepositoryClient client,
GalaxyObject object,
boolean hasChildrenHint,
BrowseChildrenOptions options) {
this.client = client;
this.object = object;
this.hasChildrenHint = hasChildrenHint;
this.options = options;
}
/** @return the underlying Galaxy object proto for this node. */
public GalaxyObject getObject() {
return object;
}
/** @return {@code true} when the server reports this node has at least one matching descendant. */
public boolean hasChildrenHint() {
return hasChildrenHint;
}
/** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */
public List<LazyBrowseNode> getChildren() {
synchronized (lock) {
return List.copyOf(children);
}
}
/** @return {@code true} after the first {@link #expand()} call completes. */
public boolean isExpanded() {
synchronized (lock) {
return isExpanded;
}
}
/**
* Fetches direct children from the gateway and populates {@link #getChildren()}.
* Idempotent: subsequent calls are no-ops and do not issue a second RPC.
*
* @throws MxGatewayException on transport or protocol failure
*/
public void expand() {
synchronized (lock) {
if (isExpanded) {
return;
}
List<LazyBrowseNode> loaded =
client.browseChildrenInner(Integer.valueOf(object.getGobjectId()), options);
this.children = loaded;
this.isExpanded = true;
}
}
}
@@ -8,6 +8,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.protobuf.Timestamp;
import galaxy_repository.v1.GalaxyRepositoryGrpc;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenReply;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.BrowseChildrenRequest;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DeployEvent;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyReply;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.DiscoverHierarchyRequest;
@@ -24,6 +26,7 @@ import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.ClientCallStreamObserver;
@@ -31,9 +34,13 @@ import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -306,6 +313,209 @@ final class GalaxyRepositoryClientTests {
}
}
@Test
void browseNoParentReturnsRoots() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
service.replies.add(browseReply(
List.of(obj(1, "Plant", true), obj(2, "Other", false)),
List.of(true, false),
1L,
""));
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<LazyBrowseNode> roots = client.browse();
assertEquals(2, roots.size());
assertEquals("Plant", roots.get(0).getObject().getTagName());
assertTrue(roots.get(0).hasChildrenHint());
assertFalse(roots.get(0).isExpanded());
assertEquals("Other", roots.get(1).getObject().getTagName());
assertFalse(roots.get(1).hasChildrenHint());
assertFalse(roots.get(1).isExpanded());
assertEquals(1, service.calls.size());
assertFalse(service.calls.get(0).hasParentGobjectId());
}
}
@Test
void browseExpandPopulatesChildrenAndMarksExpanded() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
service.replies.add(browseReply(
List.of(obj(1, "Plant", true)),
List.of(true),
1L,
""));
service.replies.add(browseReply(
List.of(obj(10, "Line1", false)),
List.of(false),
1L,
""));
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<LazyBrowseNode> roots = client.browse();
roots.get(0).expand();
assertTrue(roots.get(0).isExpanded());
assertEquals(1, roots.get(0).getChildren().size());
assertEquals("Line1", roots.get(0).getChildren().get(0).getObject().getTagName());
assertEquals(2, service.calls.size());
assertTrue(service.calls.get(1).hasParentGobjectId());
assertEquals(1, service.calls.get(1).getParentGobjectId());
}
}
@Test
void browseExpandIdempotentNoSecondRpc() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
service.replies.add(browseReply(
List.of(obj(1, "Plant", true)),
List.of(true),
1L,
""));
service.replies.add(browseReply(
List.of(obj(10, "Line1", false)),
List.of(false),
1L,
""));
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<LazyBrowseNode> roots = client.browse();
roots.get(0).expand();
roots.get(0).expand();
assertEquals(2, service.calls.size());
assertEquals(1, roots.get(0).getChildren().size());
}
}
@Test
void browseExpandUnknownParentThrowsGalaxyNotFound() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
service.replies.add(browseReply(
List.of(obj(1, "Plant", true)),
List.of(true),
1L,
""));
service.errors.add(Status.NOT_FOUND.withDescription("Parent not found").asRuntimeException());
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<LazyBrowseNode> roots = client.browse();
MxGatewayException error = assertThrows(MxGatewayException.class, () -> roots.get(0).expand());
assertTrue(
error.getMessage().toLowerCase().contains("not found"),
"expected message to mention 'not found', got: " + error.getMessage());
}
}
@Test
void browseExpandMultiPageGathersAllPages() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
// Roots
service.replies.add(browseReply(
List.of(obj(7, "Plant", true)),
List.of(true),
1L,
""));
// First child page with a next token
service.replies.add(browseReply(
List.of(obj(70, "ChildA", false), obj(71, "ChildB", false)),
List.of(false, false),
1L,
"7:abc:2"));
// Second child page closes the loop
service.replies.add(browseReply(
List.of(obj(72, "ChildC", false)),
List.of(false),
1L,
""));
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<LazyBrowseNode> roots = client.browse();
roots.get(0).expand();
assertEquals(3, roots.get(0).getChildren().size());
assertEquals(3, service.calls.size());
assertEquals("7:abc:2", service.calls.get(2).getPageToken());
}
}
@Test
void browseWithFilterForwardsToRequest() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
// Default reply is empty; only the request shape matters here.
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
client.browse(BrowseChildrenOptions.builder()
.tagNameGlob("Mixer*")
.alarmBearingOnly(true)
.build());
}
assertEquals(1, service.calls.size());
BrowseChildrenRequest request = service.calls.get(0);
assertEquals("Mixer*", request.getTagNameGlob());
assertTrue(request.getAlarmBearingOnly());
}
private static GalaxyObject obj(int id, String tag, boolean isArea) {
return GalaxyObject.newBuilder()
.setGobjectId(id)
.setTagName(tag)
.setBrowseName(tag)
.setIsArea(isArea)
.build();
}
private static BrowseChildrenReply browseReply(
List<GalaxyObject> children,
List<Boolean> childHasChildren,
long cacheSequence,
String nextPageToken) {
BrowseChildrenReply.Builder b = BrowseChildrenReply.newBuilder()
.setTotalChildCount(children.size())
.setCacheSequence(cacheSequence)
.setNextPageToken(nextPageToken);
b.addAllChildren(children);
b.addAllChildHasChildren(childHasChildren);
return b.build();
}
private static final class BrowseChildrenService extends TestService {
final List<BrowseChildrenRequest> calls =
Collections.synchronizedList(new CopyOnWriteArrayList<>());
final Queue<BrowseChildrenReply> replies = new ArrayDeque<>();
final Queue<Throwable> errors = new ArrayDeque<>();
@Override
public void browseChildren(
BrowseChildrenRequest request, StreamObserver<BrowseChildrenReply> responseObserver) {
calls.add(request);
BrowseChildrenReply reply;
Throwable err;
synchronized (this) {
// Prefer queued replies first; once they're exhausted, fall through to any
// queued error. This matches the .NET fake's ordering used by parity tests.
reply = replies.poll();
err = reply == null ? errors.poll() : null;
}
if (err != null) {
responseObserver.onError(err);
return;
}
if (reply == null) {
reply = BrowseChildrenReply.getDefaultInstance();
}
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
private abstract static class TestService extends GalaxyRepositoryGrpc.GalaxyRepositoryImplBase {
@Override
public void testConnection(
@@ -21,9 +21,10 @@ from .auth import merge_metadata
from .errors import MxGatewayError, map_rpc_error
from .generated import galaxy_repository_pb2 as galaxy_pb
from .generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
from .options import ClientOptions, create_channel
from .options import BrowseChildrenOptions, ClientOptions, create_channel
_DISCOVER_HIERARCHY_PAGE_SIZE = 5000
_BROWSE_CHILDREN_PAGE_SIZE = 500
class GalaxyRepositoryClient:
@@ -139,6 +140,73 @@ class GalaxyRepositoryClient:
)
seen_page_tokens.add(page_token)
async def browse(
self,
options: BrowseChildrenOptions | None = None,
) -> list["LazyBrowseNode"]:
"""Return the root browse nodes for lazy hierarchy traversal.
Each returned ``LazyBrowseNode`` wraps a Galaxy object whose direct
children can be loaded on demand by ``await node.expand()``.
"""
effective = options or BrowseChildrenOptions()
return [
node
async for node in self._iter_browse_children(
parent_gobject_id=None,
options=effective,
)
]
async def _iter_browse_children(
self,
*,
parent_gobject_id: int | None,
options: BrowseChildrenOptions,
) -> AsyncIterator["LazyBrowseNode"]:
page_token = ""
seen_page_tokens: set[str] = set()
while True:
request = galaxy_pb.BrowseChildrenRequest(
page_size=_BROWSE_CHILDREN_PAGE_SIZE,
page_token=page_token,
alarm_bearing_only=options.alarm_bearing_only,
historized_only=options.historized_only,
)
if parent_gobject_id is not None:
request.parent_gobject_id = parent_gobject_id
if options.category_ids:
request.category_ids.extend(options.category_ids)
if options.template_chain_contains:
request.template_chain_contains.extend(options.template_chain_contains)
if options.tag_name_glob:
request.tag_name_glob = options.tag_name_glob
if options.include_attributes is not None:
request.include_attributes = options.include_attributes
reply = await self._unary(
"browse children",
self.raw_stub.BrowseChildren,
request,
)
for index, obj in enumerate(reply.children):
hint = (
index < len(reply.child_has_children)
and bool(reply.child_has_children[index])
)
yield LazyBrowseNode(self, obj, hint, options)
page_token = reply.next_page_token
if not page_token:
return
if page_token in seen_page_tokens:
raise MxGatewayError(
f"galaxy browse children returned repeated page token {page_token!r}"
)
seen_page_tokens.add(page_token)
def watch_deploy_events(
self,
last_seen_deploy_time: datetime | None = None,
@@ -202,6 +270,63 @@ class GalaxyRepositoryClient:
raise map_rpc_error(operation, error) from error
class LazyBrowseNode:
"""One node in a lazy-loaded Galaxy browse tree.
Calling ``expand`` once fetches direct children (paginating as needed)
and populates ``children``. Subsequent calls are no-ops so callers can
drive UI expand toggles without de-duping.
"""
def __init__(
self,
client: "GalaxyRepositoryClient",
obj: galaxy_pb.GalaxyObject,
has_children_hint: bool,
options: BrowseChildrenOptions,
) -> None:
"""Initialize a node bound to its owning client and filter set."""
self._client = client
self._object = obj
self._has_children_hint = has_children_hint
self._options = options
self._children: list[LazyBrowseNode] = []
self._is_expanded = False
@property
def object(self) -> galaxy_pb.GalaxyObject:
"""Return the underlying ``GalaxyObject`` proto for this node."""
return self._object
@property
def has_children_hint(self) -> bool:
"""Return the server hint about whether this node has children."""
return self._has_children_hint
@property
def children(self) -> list["LazyBrowseNode"]:
"""Return a copy of the loaded child nodes (empty until expanded)."""
return list(self._children)
@property
def is_expanded(self) -> bool:
"""Return whether ``expand`` has already populated ``children``."""
return self._is_expanded
async def expand(self) -> None:
"""Fetch direct children of this node; no-op on subsequent calls."""
if self._is_expanded:
return
new_children: list[LazyBrowseNode] = []
async for child in self._client._iter_browse_children(
parent_gobject_id=self._object.gobject_id,
options=self._options,
):
new_children.append(child)
self._children.extend(new_children)
self._is_expanded = True
async def _canceling_iterator(call: Any) -> AsyncIterator[galaxy_pb.DeployEvent]:
try:
async for event in call:
@@ -2,7 +2,8 @@
from __future__ import annotations
from dataclasses import dataclass
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
import grpc
@@ -51,6 +52,23 @@ class ClientOptions:
)
@dataclass(frozen=True)
class BrowseChildrenOptions:
"""Filters and shape options for ``GalaxyRepositoryClient.browse``.
Mirrors the AND-combined filter set on ``BrowseChildrenRequest`` so a
single instance can be re-used across an entire lazy browse session
(the filter set is part of the page-token contract).
"""
category_ids: Sequence[int] = field(default_factory=tuple)
template_chain_contains: Sequence[str] = field(default_factory=tuple)
tag_name_glob: str | None = None
include_attributes: bool | None = None
alarm_bearing_only: bool = False
historized_only: bool = False
def create_channel(options: ClientOptions) -> grpc.aio.Channel:
"""Create a plaintext or TLS `grpc.aio` channel from client options."""
+224
View File
@@ -6,12 +6,16 @@ import asyncio
from datetime import datetime, timezone
from typing import Any
import grpc
import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from zb_mom_ww_mxgateway import ClientOptions, DeployEvent, GalaxyRepositoryClient, WatchDeployEventsRequest
from zb_mom_ww_mxgateway.errors import MxGatewayError
from zb_mom_ww_mxgateway.galaxy import LazyBrowseNode
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
from zb_mom_ww_mxgateway.options import BrowseChildrenOptions
def test_galaxy_messages_import() -> None:
@@ -268,15 +272,230 @@ async def test_close_marks_channel_closed_when_no_real_channel() -> None:
await client.close()
def _obj(gid: int, tag: str, is_area: bool = False) -> galaxy_pb.GalaxyObject:
return galaxy_pb.GalaxyObject(
gobject_id=gid, tag_name=tag, browse_name=tag, is_area=is_area,
)
def _build_browse_reply(
children: list[galaxy_pb.GalaxyObject],
child_has_children: list[bool],
cache_sequence: int,
next_page_token: str = "",
) -> galaxy_pb.BrowseChildrenReply:
reply = galaxy_pb.BrowseChildrenReply(
total_child_count=len(children),
cache_sequence=cache_sequence,
next_page_token=next_page_token,
)
reply.children.extend(children)
reply.child_has_children.extend(child_has_children)
return reply
def _fake_aio_rpc_error(code: grpc.StatusCode, details: str) -> grpc.aio.AioRpcError:
return grpc.aio.AioRpcError(
code=code,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
details=details,
)
@pytest.mark.asyncio
async def test_browse_no_parent_returns_roots() -> None:
stub = FakeGalaxyStub()
stub.browse_children.replies = [
_build_browse_reply(
children=[_obj(1, "Area_A", is_area=True), _obj(2, "Area_B", is_area=True)],
child_has_children=[True, False],
cache_sequence=7,
),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
roots = await client.browse()
assert len(roots) == 2
assert all(isinstance(node, LazyBrowseNode) for node in roots)
assert roots[0].object.tag_name == "Area_A"
assert roots[0].has_children_hint is True
assert roots[1].has_children_hint is False
assert roots[0].is_expanded is False
request = stub.browse_children.requests[0]
assert request.WhichOneof("parent") is None
assert request.page_size == 500
assert request.page_token == ""
@pytest.mark.asyncio
async def test_browse_expand_populates_children_and_marks_expanded() -> None:
stub = FakeGalaxyStub()
stub.browse_children.replies = [
_build_browse_reply(
children=[_obj(1, "Area_A", is_area=True)],
child_has_children=[True],
cache_sequence=1,
),
_build_browse_reply(
children=[_obj(11, "Child_A"), _obj(12, "Child_B")],
child_has_children=[False, False],
cache_sequence=1,
),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
roots = await client.browse()
await roots[0].expand()
assert roots[0].is_expanded is True
assert [n.object.tag_name for n in roots[0].children] == ["Child_A", "Child_B"]
assert len(stub.browse_children.requests) == 2
expand_request = stub.browse_children.requests[1]
assert expand_request.WhichOneof("parent") == "parent_gobject_id"
assert expand_request.parent_gobject_id == 1
@pytest.mark.asyncio
async def test_browse_expand_idempotent_no_second_rpc() -> None:
stub = FakeGalaxyStub()
stub.browse_children.replies = [
_build_browse_reply(
children=[_obj(1, "Area_A", is_area=True)],
child_has_children=[True],
cache_sequence=1,
),
_build_browse_reply(
children=[_obj(11, "Child_A")],
child_has_children=[False],
cache_sequence=1,
),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
roots = await client.browse()
await roots[0].expand()
await roots[0].expand()
assert len(stub.browse_children.requests) == 2
assert len(roots[0].children) == 1
@pytest.mark.asyncio
async def test_browse_expand_unknown_parent_raises_mxgateway_error() -> None:
stub = FakeGalaxyStub()
stub.browse_children.replies = [
_build_browse_reply(
children=[_obj(99, "Stale_Parent", is_area=True)],
child_has_children=[True],
cache_sequence=1,
),
]
stub.browse_children.exceptions = [
None,
_fake_aio_rpc_error(grpc.StatusCode.NOT_FOUND, "parent not found"),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
roots = await client.browse()
with pytest.raises(MxGatewayError):
await roots[0].expand()
@pytest.mark.asyncio
async def test_browse_expand_multi_page_gathers_all_pages() -> None:
stub = FakeGalaxyStub()
stub.browse_children.replies = [
_build_browse_reply(
children=[_obj(7, "Area_Big", is_area=True)],
child_has_children=[True],
cache_sequence=2,
),
_build_browse_reply(
children=[_obj(71, "Child_1"), _obj(72, "Child_2")],
child_has_children=[False, False],
cache_sequence=2,
next_page_token="7:abc:2",
),
_build_browse_reply(
children=[_obj(73, "Child_3")],
child_has_children=[False],
cache_sequence=2,
),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
roots = await client.browse()
await roots[0].expand()
assert [n.object.tag_name for n in roots[0].children] == ["Child_1", "Child_2", "Child_3"]
assert len(stub.browse_children.requests) == 3
assert stub.browse_children.requests[2].page_token == "7:abc:2"
assert stub.browse_children.requests[2].parent_gobject_id == 7
@pytest.mark.asyncio
async def test_browse_with_filter_forwards_to_request() -> None:
stub = FakeGalaxyStub()
stub.browse_children.replies = [
_build_browse_reply(
children=[_obj(1, "Area_A", is_area=True)],
child_has_children=[False],
cache_sequence=3,
),
]
client = await GalaxyRepositoryClient.connect(
ClientOptions(endpoint="fake", plaintext=True),
stub=stub,
)
options = BrowseChildrenOptions(
category_ids=(4, 5),
template_chain_contains=("$DelmiaReceiver",),
tag_name_glob="Area_*",
include_attributes=True,
alarm_bearing_only=True,
historized_only=True,
)
await client.browse(options)
request = stub.browse_children.requests[0]
assert list(request.category_ids) == [4, 5]
assert list(request.template_chain_contains) == ["$DelmiaReceiver"]
assert request.tag_name_glob == "Area_*"
assert request.HasField("include_attributes")
assert request.include_attributes is True
assert request.alarm_bearing_only is True
assert request.historized_only is True
class FakeGalaxyStub:
def __init__(self) -> None:
self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)])
self.get_last_deploy_time = FakeUnary([galaxy_pb.GetLastDeployTimeReply(present=False)])
self.discover_hierarchy = FakeUnary([galaxy_pb.DiscoverHierarchyReply()])
self.browse_children = FakeUnary([galaxy_pb.BrowseChildrenReply()])
self.watch_deploy_events = FakeStream([])
self.TestConnection = self.test_connection
self.GetLastDeployTime = self.get_last_deploy_time
self.DiscoverHierarchy = self.discover_hierarchy
self.BrowseChildren = self.browse_children
@property
def WatchDeployEvents(self) -> "FakeStream": # noqa: N802 — gRPC naming
@@ -287,6 +506,7 @@ class FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = replies
self.requests: list[Any] = []
self.exceptions: list[BaseException] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
@@ -298,6 +518,10 @@ class FakeUnary:
) -> Any:
self.requests.append(request)
self.metadata = metadata
if self.exceptions:
exc = self.exceptions.pop(0)
if exc is not None:
raise exc
return self.replies.pop(0)
+527 -4
View File
@@ -5,9 +5,12 @@
//! read-only RPCs as Rust async methods. Generated Galaxy proto types are
//! re-exported through [`crate::generated::galaxy_repository::v1`].
use std::collections::HashSet;
use std::fs;
use std::sync::Arc;
use prost_types::Timestamp;
use tokio::sync::Mutex as AsyncMutex;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
@@ -16,12 +19,130 @@ use crate::auth::AuthInterceptor;
use crate::error::Error;
use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient;
use crate::generated::galaxy_repository::v1::{
DeployEvent, DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest,
TestConnectionRequest, WatchDeployEventsRequest,
browse_children_request, BrowseChildrenReply, BrowseChildrenRequest, DeployEvent,
DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, TestConnectionRequest,
WatchDeployEventsRequest,
};
use crate::options::ClientOptions;
const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000;
const BROWSE_CHILDREN_PAGE_SIZE: i32 = 500;
/// Optional filter set forwarded to `GalaxyRepository.BrowseChildren`.
///
/// Mirrors the request-level filters on the wire: combined with AND so a child
/// only appears when it satisfies every populated criterion. Construct via
/// [`BrowseChildrenOptions::default`] and tweak the fields you care about.
#[derive(Debug, Clone, Default)]
pub struct BrowseChildrenOptions {
/// Restrict to objects whose `category_id` matches one of the supplied
/// Galaxy category identifiers. Empty means "no restriction".
pub category_ids: Vec<i32>,
/// Restrict to objects whose template chain contains every supplied
/// template name (case-sensitive substring match on each entry).
pub template_chain_contains: Vec<String>,
/// Restrict to objects whose tag name matches the supplied glob (SQL
/// `LIKE`-style on the server). `None` means "no glob filter".
pub tag_name_glob: Option<String>,
/// Optional tri-state hint for whether to populate `GalaxyObject.attributes`
/// on returned children. `None` falls back to the server default.
pub include_attributes: Option<bool>,
/// When `true`, only return children that own at least one alarm-bearing
/// attribute (matches `DiscoverHierarchy` semantics).
pub alarm_bearing_only: bool,
/// When `true`, only return children that own at least one historized
/// attribute (matches `DiscoverHierarchy` semantics).
pub historized_only: bool,
}
/// Lazy hierarchy node used by the walker built on top of `BrowseChildren`.
///
/// A node owns its [`GalaxyObject`], a hint as to whether the server believes
/// it has at least one matching descendant under the active filter set, and an
/// internal `expanded` flag protected by an async mutex. Calling [`expand`]
/// the first time issues a paged `BrowseChildren` RPC; subsequent calls are
/// no-ops so callers can poll without re-hitting the server.
///
/// `LazyBrowseNode` is cheap to clone — clones share state through an
/// internal `Arc`, so expanding one clone makes the children visible to every
/// other clone.
///
/// [`expand`]: LazyBrowseNode::expand
pub struct LazyBrowseNode {
inner: Arc<LazyBrowseNodeInner>,
}
impl Clone for LazyBrowseNode {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
struct LazyBrowseNodeInner {
client: GalaxyClient,
object: GalaxyObject,
has_children_hint: bool,
options: BrowseChildrenOptions,
state: AsyncMutex<LazyBrowseNodeState>,
}
struct LazyBrowseNodeState {
children: Vec<LazyBrowseNode>,
is_expanded: bool,
}
impl LazyBrowseNode {
/// Borrow the [`GalaxyObject`] returned by the server for this node.
pub fn object(&self) -> &GalaxyObject {
&self.inner.object
}
/// Server-supplied hint: `true` when the child likely has at least one
/// further matching descendant. Useful to decide whether a UI should draw
/// an expand triangle without issuing the RPC up front.
pub fn has_children_hint(&self) -> bool {
self.inner.has_children_hint
}
/// Snapshot of the currently-known children. Empty until [`expand`] has
/// run at least once.
///
/// [`expand`]: LazyBrowseNode::expand
pub async fn children(&self) -> Vec<LazyBrowseNode> {
self.inner.state.lock().await.children.clone()
}
/// Returns `true` once [`expand`] has populated this node's children.
///
/// [`expand`]: LazyBrowseNode::expand
pub async fn is_expanded(&self) -> bool {
self.inner.state.lock().await.is_expanded
}
/// Populate this node's children by issuing a paged `BrowseChildren` RPC.
/// Subsequent calls are no-ops — the cached children stay in place and no
/// additional RPC is issued.
pub async fn expand(&self) -> Result<(), Error> {
let mut state = self.inner.state.lock().await;
if state.is_expanded {
return Ok(());
}
let mut client = self.inner.client.clone();
let new_children = client
.browse_children_inner(
Some(self.inner.object.gobject_id),
self.inner.options.clone(),
)
.await?;
state.children = new_children;
state.is_expanded = true;
Ok(())
}
}
/// Convenience alias for the generated Galaxy client wrapped in the
/// authentication interceptor.
@@ -172,6 +293,99 @@ impl GalaxyClient {
}
}
/// Browse the top-level (root) objects of the hierarchy as
/// [`LazyBrowseNode`] instances. Pass [`BrowseChildrenOptions`] to
/// restrict the result set; the same filter is reused when callers expand
/// any returned node.
pub async fn browse(
&mut self,
options: Option<BrowseChildrenOptions>,
) -> Result<Vec<LazyBrowseNode>, Error> {
let effective = options.unwrap_or_default();
self.browse_children_inner(None, effective).await
}
/// Issue a single `BrowseChildren` RPC and return the raw reply. Callers
/// that want to drive paging themselves (or inspect the cache sequence)
/// use this; high-level walking goes through [`browse`] and
/// [`LazyBrowseNode::expand`].
///
/// [`browse`]: GalaxyClient::browse
pub async fn browse_children_raw(
&mut self,
request: BrowseChildrenRequest,
) -> Result<BrowseChildrenReply, Error> {
let response = self
.inner
.browse_children(self.unary_request(request))
.await?;
Ok(response.into_inner())
}
pub(crate) async fn browse_children_inner(
&mut self,
parent_gobject_id: Option<i32>,
options: BrowseChildrenOptions,
) -> Result<Vec<LazyBrowseNode>, Error> {
let mut nodes = Vec::new();
let mut page_token = String::new();
let mut seen_page_tokens: HashSet<String> = HashSet::new();
loop {
let parent = parent_gobject_id.map(browse_children_request::Parent::ParentGobjectId);
let request = BrowseChildrenRequest {
page_size: BROWSE_CHILDREN_PAGE_SIZE,
page_token: page_token.clone(),
category_ids: options.category_ids.clone(),
template_chain_contains: options.template_chain_contains.clone(),
tag_name_glob: options.tag_name_glob.clone().unwrap_or_default(),
include_attributes: options.include_attributes,
alarm_bearing_only: options.alarm_bearing_only,
historized_only: options.historized_only,
parent,
};
let reply = self.browse_children_raw(request).await?;
let hints = reply.child_has_children;
for (index, object) in reply.children.into_iter().enumerate() {
let hint = hints.get(index).copied().unwrap_or(false);
nodes.push(self.make_lazy_node(object, hint, options.clone()));
}
page_token = reply.next_page_token;
if page_token.is_empty() {
return Ok(nodes);
}
if !seen_page_tokens.insert(page_token.clone()) {
return Err(Error::InvalidArgument {
name: "page_token".to_owned(),
detail: format!(
"galaxy browse children returned repeated page token `{page_token}`"
),
});
}
}
}
fn make_lazy_node(
&self,
object: GalaxyObject,
has_children_hint: bool,
options: BrowseChildrenOptions,
) -> LazyBrowseNode {
LazyBrowseNode {
inner: Arc::new(LazyBrowseNodeInner {
client: self.clone(),
object,
has_children_hint,
options,
state: AsyncMutex::new(LazyBrowseNodeState {
children: Vec::new(),
is_expanded: false,
}),
}),
}
}
/// Subscribe to the server-streamed deploy-event feed.
///
/// The server emits a bootstrap event describing the current cache state
@@ -250,6 +464,9 @@ mod tests {
objects: Mutex<Vec<GalaxyObject>>,
discover_requests: Mutex<Vec<DiscoverHierarchyRequest>>,
discover_replies: Mutex<std::collections::VecDeque<DiscoverHierarchyReply>>,
browse_children_calls: Mutex<Vec<BrowseChildrenRequest>>,
browse_children_replies: Mutex<std::collections::VecDeque<BrowseChildrenReply>>,
browse_children_errors: Mutex<Vec<Status>>,
watch_requests: Mutex<Vec<WatchDeployEventsRequest>>,
watch_events: Mutex<Vec<DeployEvent>>,
watch_senders: Mutex<Vec<DeployEventTx>>,
@@ -309,9 +526,24 @@ mod tests {
async fn browse_children(
&self,
_request: Request<BrowseChildrenRequest>,
request: Request<BrowseChildrenRequest>,
) -> Result<Response<BrowseChildrenReply>, Status> {
Err(Status::unimplemented("browse_children not implemented in FakeGalaxy"))
self.state
.browse_children_calls
.lock()
.unwrap()
.push(request.into_inner());
if let Some(error) = self.state.browse_children_errors.lock().unwrap().pop() {
return Err(error);
}
let reply = self
.state
.browse_children_replies
.lock()
.unwrap()
.pop_front()
.unwrap_or_default();
Ok(Response::new(reply))
}
type WatchDeployEventsStream =
@@ -703,4 +935,295 @@ mod tests {
"drop signal channel closed unexpectedly"
);
}
fn browse_obj(gid: i32, tag: &str, is_area: bool) -> GalaxyObject {
GalaxyObject {
gobject_id: gid,
tag_name: tag.to_owned(),
contained_name: String::new(),
browse_name: tag.to_owned(),
parent_gobject_id: 0,
is_area,
category_id: 0,
hosted_by_gobject_id: 0,
template_chain: Vec::new(),
attributes: Vec::new(),
}
}
fn build_browse_reply(
children: Vec<GalaxyObject>,
child_has_children: Vec<bool>,
cache_sequence: u64,
) -> BrowseChildrenReply {
BrowseChildrenReply {
total_child_count: children.len() as i32,
cache_sequence,
children,
child_has_children,
next_page_token: String::new(),
}
}
#[tokio::test]
async fn browse_no_parent_returns_roots() {
let state = Arc::new(FakeState::default());
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(1, "Area_A", true), browse_obj(2, "Area_B", true)],
vec![true, false],
7,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
assert_eq!(roots.len(), 2);
assert_eq!(roots[0].object().tag_name, "Area_A");
assert!(roots[0].has_children_hint());
assert_eq!(roots[1].object().tag_name, "Area_B");
assert!(!roots[1].has_children_hint());
let calls = state.browse_children_calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(
calls[0].parent.is_none(),
"root browse must send an empty parent oneof, got {:?}",
calls[0].parent
);
}
#[tokio::test]
async fn browse_expand_populates_children_and_marks_expanded() {
let state = Arc::new(FakeState::default());
// First call: roots.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(10, "Area_A", true)],
vec![true],
1,
));
// Second call: children of gobject 10.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(11, "Receiver_1", false)],
vec![false],
1,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().expect("at least one root");
assert!(!root.is_expanded().await);
root.expand().await.unwrap();
assert!(root.is_expanded().await);
let children = root.children().await;
assert_eq!(children.len(), 1);
assert_eq!(children[0].object().tag_name, "Receiver_1");
let calls = state.browse_children_calls.lock().unwrap();
assert_eq!(calls.len(), 2);
let expand_call = &calls[1];
match expand_call.parent.as_ref().expect("expand sends parent") {
browse_children_request::Parent::ParentGobjectId(id) => assert_eq!(*id, 10),
other => panic!("expected ParentGobjectId variant, got {other:?}"),
}
}
#[tokio::test]
async fn browse_expand_idempotent_no_second_rpc() {
let state = Arc::new(FakeState::default());
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(20, "Area_X", true)],
vec![true],
1,
));
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(21, "Leaf", false)],
vec![false],
1,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().unwrap();
root.expand().await.unwrap();
let after_first = state.browse_children_calls.lock().unwrap().len();
// Calling expand a second time must NOT issue a new RPC.
root.expand().await.unwrap();
let after_second = state.browse_children_calls.lock().unwrap().len();
assert_eq!(
after_first, after_second,
"expand should be idempotent — no extra RPC the second time"
);
assert_eq!(root.children().await.len(), 1);
}
#[tokio::test]
async fn browse_expand_unknown_parent_returns_not_found_error() {
let state = Arc::new(FakeState::default());
// Root browse succeeds.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(99, "GhostArea", true)],
vec![true],
1,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().unwrap();
// Seed the NotFound only AFTER the root call so the FakeGalaxy's
// error stack doesn't intercept the initial browse.
state
.browse_children_errors
.lock()
.unwrap()
.push(Status::not_found("parent gobject 99 not present in cache"));
let error = root.expand().await.unwrap_err();
match &error {
Error::Status(status) => {
assert_eq!(status.code(), tonic::Code::NotFound);
}
other => panic!("expected Error::Status(NotFound), got {other:?}"),
}
// Failed expand must NOT mark the node as expanded — caller can retry.
assert!(!root.is_expanded().await);
assert!(root.children().await.is_empty());
}
#[tokio::test]
async fn browse_expand_multi_page_gathers_all_pages() {
let state = Arc::new(FakeState::default());
// First reply: roots.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(30, "Plant", true)],
vec![true],
5,
));
// Second reply: page 1 of children, with a next_page_token.
let mut page_one = build_browse_reply(
vec![
browse_obj(31, "Child_A", false),
browse_obj(32, "Child_B", false),
],
vec![false, false],
5,
);
page_one.next_page_token = "cursor-2".to_owned();
page_one.total_child_count = 3;
state
.browse_children_replies
.lock()
.unwrap()
.push_back(page_one);
// Third reply: page 2 of children, with no next page.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(33, "Child_C", false)],
vec![false],
5,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().unwrap();
root.expand().await.unwrap();
let children = root.children().await;
assert_eq!(children.len(), 3);
assert_eq!(children[0].object().tag_name, "Child_A");
assert_eq!(children[1].object().tag_name, "Child_B");
assert_eq!(children[2].object().tag_name, "Child_C");
let calls = state.browse_children_calls.lock().unwrap();
// 1 root call + 2 paged expand calls = 3 total.
assert_eq!(calls.len(), 3);
assert_eq!(calls[1].page_token, "");
assert_eq!(calls[2].page_token, "cursor-2");
}
#[tokio::test]
async fn browse_with_filter_forwards_to_request() {
let state = Arc::new(FakeState::default());
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(Vec::new(), Vec::new(), 1));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let options = BrowseChildrenOptions {
category_ids: vec![3, 5],
template_chain_contains: vec!["$DelmiaReceiver".to_owned()],
tag_name_glob: Some("Recv_*".to_owned()),
include_attributes: Some(true),
alarm_bearing_only: true,
historized_only: false,
};
let _ = client.browse(Some(options)).await.unwrap();
let calls = state.browse_children_calls.lock().unwrap();
assert_eq!(calls.len(), 1);
let req = &calls[0];
assert_eq!(req.category_ids, vec![3, 5]);
assert_eq!(req.template_chain_contains, vec!["$DelmiaReceiver"]);
assert_eq!(req.tag_name_glob, "Recv_*");
assert_eq!(req.include_attributes, Some(true));
assert!(req.alarm_bearing_only);
assert!(!req.historized_only);
}
}
@@ -0,0 +1,240 @@
# Client Lazy-Browse Walker Helpers + Per-Language Tests
Date: 2026-05-28
Status: approved, ready for implementation plan
## Problem
The `BrowseChildren` RPC shipped (branch `feat/lazy-browse-children`, merged
or pending merge), but each language client exposes only the raw generated
gRPC stub. Callers must hand-write recursion, sibling pagination, and
NotFound translation themselves. Only one client (.NET) has a smoke test,
and it is skippable.
This work adds a small high-level walker to each client and unit tests so
callers can build OPC UA-style browse trees without re-implementing the
same plumbing five times.
## Scope
Each of the five clients (.NET, Python, Rust, Go, Java) gains:
1. A low-level `BrowseChildren*Async` wrapper on the existing
`GalaxyRepositoryClient`, mirroring the existing `DiscoverHierarchy*Async`
shape.
2. A high-level `LazyBrowseNode` type plus a `BrowseAsync` factory.
3. Five unit tests against the language's existing fake-transport fixture.
Plus a one-time toolchain bootstrap so the Java client builds locally on
the macOS dev host (Homebrew install of Temurin 21 + Gradle).
## Architecture
`LazyBrowseNode` is shared in shape across languages:
```text
LazyBrowseNode {
Object GalaxyObject (immutable, from server)
HasChildrenHint bool (server's child_has_children value)
Children list<LazyBrowseNode> (empty until Expand)
IsExpanded bool
ExpandAsync(ct) Task (idempotent; no-op after first call)
}
```
`GalaxyRepositoryClient.BrowseAsync(parent?, ct)` returns a list of root
`LazyBrowseNode`s. Empty `parent` means structural roots. Each returned
node is unexpanded; the caller invokes `ExpandAsync` to fetch direct
children. After expand, `Children` is a list of further `LazyBrowseNode`s.
**Pagination is hidden.** `ExpandAsync` walks `next_page_token` internally
until all siblings of this parent are gathered. Callers see one flat
`Children` list.
**Errors:** server `NotFound` becomes a language-idiomatic typed error
(`MxGatewayException` in .NET, `GalaxyNotFoundError` in Python,
`GalaxyError::NotFound` in Rust, typed error in Go,
`GalaxyNotFoundException` in Java).
**Filters:** `BrowseAsync` accepts a `BrowseChildrenOptions` (or
language-equivalent) mirroring the existing `DiscoverHierarchyOptions`. The
same options apply to every `ExpandAsync` call rooted from that factory
call — stored on the node so child expansions inherit them.
## Per-language API
Each language adapts to its own idioms; the structure is parallel.
### .NET (`clients/dotnet/ZB.MOM.WW.MxGateway.Client/GalaxyRepositoryClient.cs`)
```csharp
public sealed class LazyBrowseNode
{
public GalaxyObject Object { get; }
public bool HasChildrenHint { get; }
public IReadOnlyList<LazyBrowseNode> Children { get; }
public bool IsExpanded { get; }
public Task ExpandAsync(CancellationToken ct = default);
}
public Task<IReadOnlyList<LazyBrowseNode>> BrowseAsync(
BrowseChildrenOptions? options = null,
CancellationToken ct = default);
public Task<BrowseChildrenReply> BrowseChildrenRawAsync(
BrowseChildrenRequest request,
CancellationToken ct = default);
```
### Python (`clients/python/src/zb_mom_ww_mxgateway/galaxy.py`)
```python
@dataclass
class LazyBrowseNode:
object: GalaxyObject
has_children_hint: bool
children: list["LazyBrowseNode"]
is_expanded: bool
async def expand(self) -> None: ...
async def browse(
self,
options: BrowseChildrenOptions | None = None,
) -> list[LazyBrowseNode]: ...
```
### Rust (`clients/rust/src/galaxy.rs`)
```rust
pub struct LazyBrowseNode { /* private fields; Arc<Mutex<>> for Children */ }
impl LazyBrowseNode {
pub fn object(&self) -> &GalaxyObject;
pub fn has_children_hint(&self) -> bool;
pub fn children(&self) -> Vec<LazyBrowseNode>; // cloned snapshot
pub fn is_expanded(&self) -> bool;
pub async fn expand(&self) -> Result<(), GalaxyError>;
}
pub async fn browse(
&self,
options: Option<BrowseChildrenOptions>,
) -> Result<Vec<LazyBrowseNode>, GalaxyError>;
```
### Go (`clients/go/mxgateway/galaxy.go`)
```go
type LazyBrowseNode struct { /* unexported */ }
func (n *LazyBrowseNode) Object() *pb.GalaxyObject
func (n *LazyBrowseNode) HasChildrenHint() bool
func (n *LazyBrowseNode) Children() []*LazyBrowseNode
func (n *LazyBrowseNode) IsExpanded() bool
func (n *LazyBrowseNode) Expand(ctx context.Context) error
func (c *Client) Browse(
ctx context.Context,
opts *BrowseChildrenOptions,
) ([]*LazyBrowseNode, error)
```
### Java (`clients/java/zb-mom-ww-mxgateway-client/`)
```java
public final class LazyBrowseNode {
public GalaxyObject getObject();
public boolean hasChildrenHint();
public List<LazyBrowseNode> getChildren();
public boolean isExpanded();
public CompletableFuture<Void> expandAsync();
}
public CompletableFuture<List<LazyBrowseNode>> browseAsync(
BrowseChildrenOptions options);
```
If the existing Java client surface is synchronous, mirror that — both
sync and async variants are acceptable as long as the choice matches the
client's existing convention.
## Tests
Each language adds these six facts against its existing fake-transport
fixture (`FakeGalaxyRepositoryTransport` in .NET, the equivalent in each
other client):
| # | Test | Purpose |
|---|------|---------|
| 1 | `Browse_NoParent_ReturnsRoots` | factory returns roots, each unexpanded, hint reflects fake's `child_has_children` |
| 2 | `Expand_PopulatesChildrenAndMarksExpanded` | one ExpandAsync call fires one BrowseChildren RPC; Children populated; IsExpanded flips |
| 3 | `Expand_CalledTwice_NoSecondRpc` | idempotency — fake records RPC count == 1 |
| 4 | `Expand_UnknownParent_ThrowsGalaxyNotFound` | server NotFound surfaces as language-typed error |
| 5 | `Expand_MultiPageSiblings_GathersAllPages` | fake returns NextPageToken on first call; helper walks pages until empty; flat Children list |
| 6 | `Browse_WithFilter_ForwardsToRequest` | options propagate into the wire request (`tag_name_glob` etc.) |
No new live-only tests in this batch. The existing
`BrowseChildrenSmokeTests` in .NET covers wire compatibility.
## Java toolchain bootstrap
The macOS dev host lacks a JVM. Install via Homebrew (one-time):
```bash
brew install temurin@21
brew install gradle
```
Verify:
```bash
java -version # expect 21.x
gradle --version # expect 8.x or 9.x
```
If the Temurin formula does not auto-link `JAVA_HOME`, add to shell init:
```bash
export JAVA_HOME="$(/usr/libexec/java_home -v 21)"
```
Then verify the existing Java client builds against its committed
generated tree:
```bash
cd clients/java
gradle build -x test
```
If build succeeds, regenerate protos to pick up `BrowseChildren`:
```bash
gradle generateProto
```
This produces the new Java RPC stubs that the walker work depends on.
**Failure path:** if Homebrew installs but `gradle build` fails for an
environmental reason (e.g., proto plugin version mismatch), fall back to
"defer Java" — implement the other four clients and document that Java
walker work waits for the Windows host. Do not spend more than ~30
minutes debugging local Java issues; the Windows host already builds the
Java client cleanly.
## Documentation updates
Each client's `README.md` "Browsing lazily" snippet (added in commit
`0d6193c`) gets one short example block showing the high-level walker
in addition to the existing raw-RPC snippet. Approximately three
sentences plus a 5-line code block per language.
No changes to `gateway.md`, `docs/GalaxyRepository.md`, or
`docs/DesignDecisions.md` — those describe the wire contract; the
walkers are client-side ergonomics, not part of the wire surface.
## Non-goals
- Async iterator / streaming walker (rejected in brainstorming —
encourages eager-to-completion consumption that defeats laziness).
- Explicit `RefreshAsync` on `LazyBrowseNode` (single-shot expand is
enough; caller invalidates the tree by re-calling `BrowseAsync`).
- Tree-builder helpers that pre-fetch the whole hierarchy (that's just
`DiscoverHierarchy` with extra round-trips).
- Server changes — the wire contract is final.
- Cross-client integration test runner — each client tests in isolation.
- Java regen on Mac if Homebrew install fails — defer to Windows host.
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,15 @@
{
"planPath": "docs/plans/2026-05-28-client-walker-implementation.md",
"tasks": [
{"id": 23, "subject": "Task 0: Branch state check", "status": "pending"},
{"id": 24, "subject": "Task 1: Java toolchain bootstrap", "status": "pending", "blockedBy": [23]},
{"id": 25, "subject": "Task 2: .NET LazyBrowseNode walker + 6 tests", "status": "pending", "blockedBy": [23]},
{"id": 26, "subject": "Task 3: Python LazyBrowseNode walker + 6 tests", "status": "pending", "blockedBy": [23]},
{"id": 27, "subject": "Task 4: Rust LazyBrowseNode walker + 6 tests", "status": "pending", "blockedBy": [23]},
{"id": 28, "subject": "Task 5: Go LazyBrowseNode walker + 6 tests", "status": "pending", "blockedBy": [23]},
{"id": 29, "subject": "Task 6: Java LazyBrowseNode walker + tests", "status": "pending", "blockedBy": [23, 24]},
{"id": 30, "subject": "Task 7: README walker examples for all 5 clients", "status": "pending", "blockedBy": [25, 26, 27, 28]},
{"id": 31, "subject": "Task 8: Final integration build + verification", "status": "pending", "blockedBy": [29, 30]}
],
"lastUpdated": "2026-05-28T18:30:00Z"
}