a0203503a7
Re-reviewed every module/client against the 10-category checklist
(REVIEW-PROCESS.md) at commit 1cd51bb, filed 72 new findings, and
fixed them in three priority waves (3 High, 17 Medium, 52 Low).
Highs
- Server-017: enumerate AcknowledgeAlarm / QueryActiveAlarms in
GatewayGrpcScopeResolver so non-admin keys can use them; document
the mapping in docs/Authorization.md; add interceptor tests.
- Client.Java-013: add the five missing bulk-method stubs to the
CLI FakeSession so the test module compiles on a clean tree.
- Client.Rust-013: fix the clippy::doc_lazy_continuation regression
in generated tonic code by reformatting the ReadBulkCommand proto
comment and scoping a #![allow(...)] to the generated submodules.
Mediums (highlights)
- Server: unify GatewaySession state-lock discipline (-015) and
make DisposeAsync race-safe against in-flight CloseAsync (-016);
add constraint-enforcement test coverage for the bulk-plan path
(-021).
- Worker: introduce StaRuntimeShutdownException so RunAlarmPollLoop
can distinguish graceful shutdown from a real STA-affinity
violation (-016); have the watchdog skip StaHung while
CurrentCommandCorrelationId is non-empty so a legitimate slow
ReadBulk no longer self-faults (-017).
- Tests: add per-method round-trip + cancellation coverage for the
11 GatewaySession bulk methods (-013); replace the real TCP probe
in GalaxyHierarchyCacheTests with an IGalaxyRepository fake
(-016).
- IntegrationTests: drive the StreamEvents writer in the live Write
test and assert OnWriteComplete (-012); add live tests for
Unadvise/RemoveItem/Unregister ordering, WriteSecured, and
abnormal worker exit (-014).
- Worker.Tests: replace MxAccessSession reflection with an internal
CreateForTesting factory (-016); cover WorkerCancel and
unexpected-body envelope branches (-017).
- Client.Java: cancel MxEventStream when close() races
beforeStart() (-014); return a CancellingCompletableFuture that
actually forwards cancellation through .thenApply chains (-015).
- Client.Python: drop the silent localhost-plaintext downgrade in
the CLI; require explicit --plaintext (-013).
- Client.Rust: stop bench-read-bulk from polluting success-latency
histograms with failed-call durations (-015); add coverage for
the five MalformedReply paths, the bulk-write helpers, the
Error::Unavailable mapping, and the unary-fault path (-016).
- Contracts: extend docs/Contracts.md with the bulk read/write
command family (-009).
Lows (highlights)
- Server: cap GalaxyGlobMatcher.RegexCache; align
WorkerAlarmRpcDispatcher missing-session handling; drop the
duplicate dashboard @page routes; refresh IAlarmRpcDispatcher
XML doc.
- Worker: surface SetXmlAlarmQuery COM failures; remove dead
subscriptionExpression / ExecutingCommand arms; preserve
factory-supplied runtime sessions; split MxAlarmSnapshot.cs into
three files.
- Tests: dispose the WebApplication in seven test classes; rebuild
FakeWorkerProcess.WaitForExitAsync against a real TaskCompletion
source; switch the heartbeat-expires test to ManualTimeProvider;
add InvariantCulture to the remaining DateTimeOffset.Parse sites;
document GalaxyFilterInputSafetyTests in GatewayTesting.md.
- IntegrationTests: comment fixes, RecordingServerStreamWriter
IDisposable, class-level [Trait], single-source ZB default
connection string.
- Worker.Tests: replace silent-return gating with LiveMxAccessFact
so absent env vars SKIP not pass; PascalCase rename of probe
[Fact]s; deterministic deadline test; new frame-protocol error
tests; ComputeTransitions diff-coverage; relocate dev-rig probes
to Probes/.
- Contracts: add round-trip coverage and per-field redaction /
Galaxy-identifier comments to the protos.
- Client.Dotnet: introduce clients/dotnet/Directory.Build.props so
TreatWarningsAsErrors / analysers apply; document
DiscoverHierarchyOptions and IMxGatewayCliClient; require typed
bulk-read handles in CLI; surface AcknowledgeAlarm transport
faults through Translate().
- Client.Go: kill dead code in alarms_test / fakeGalaxyServer /
runWriteBulkVariant; document the six new subcommands in
writeUsage; drain galaxy-watch events on limit; switch io.EOF
comparisons to errors.Is.
- Client.Java: shared shutdown helpers + new shutdownTimeout
option; regex-based credential redaction; Long.toUnsignedString
for uint64 sequence; doc fixes.
- Client.Python: combine duplicate imports; add coverage for
_percentile / bench-read-bulk / MAX_AGGREGATE_EVENTS /
_api_key_from_env; populate pyproject metadata and ship py.typed.
- Client.Rust: expose next_correlation_id() so CLI ping/close
stop hard-coding correlation IDs; resync RustClientDesign.md
with the current Session / Error surface and CLI subcommand set.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
218 lines
7.1 KiB
Go
218 lines
7.1 KiB
Go
package mxgateway
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"time"
|
|
|
|
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
|
|
// Galaxy Repository service exposed for callers that need direct contract
|
|
// access.
|
|
type RawGalaxyRepositoryClient = pb.GalaxyRepositoryClient
|
|
|
|
// Generated protobuf aliases for Galaxy Repository messages.
|
|
type (
|
|
// TestConnectionRequest is the request for Galaxy Repository TestConnection.
|
|
TestConnectionRequest = pb.TestConnectionRequest
|
|
// TestConnectionReply is the reply for Galaxy Repository TestConnection.
|
|
TestConnectionReply = pb.TestConnectionReply
|
|
// GetLastDeployTimeRequest is the request for GetLastDeployTime.
|
|
GetLastDeployTimeRequest = pb.GetLastDeployTimeRequest
|
|
// GetLastDeployTimeReply is the reply for GetLastDeployTime.
|
|
GetLastDeployTimeReply = pb.GetLastDeployTimeReply
|
|
// DiscoverHierarchyRequest is the request for DiscoverHierarchy.
|
|
DiscoverHierarchyRequest = pb.DiscoverHierarchyRequest
|
|
// DiscoverHierarchyReply is the reply for DiscoverHierarchy.
|
|
DiscoverHierarchyReply = pb.DiscoverHierarchyReply
|
|
// GalaxyObject describes one Galaxy object with its dynamic attributes.
|
|
GalaxyObject = pb.GalaxyObject
|
|
// GalaxyAttribute describes one dynamic attribute on a GalaxyObject.
|
|
GalaxyAttribute = pb.GalaxyAttribute
|
|
// WatchDeployEventsRequest is the request for WatchDeployEvents.
|
|
WatchDeployEventsRequest = pb.WatchDeployEventsRequest
|
|
// DeployEvent is one Galaxy Repository deploy event.
|
|
DeployEvent = pb.DeployEvent
|
|
)
|
|
|
|
// RawDeployEventStream is the generated WatchDeployEvents client stream.
|
|
type RawDeployEventStream = grpc.ServerStreamingClient[pb.DeployEvent]
|
|
|
|
// GalaxyClient owns a gateway gRPC connection and exposes Galaxy Repository
|
|
// browse helpers. It mirrors the structure of Client and uses the same
|
|
// connection-management conventions.
|
|
type GalaxyClient struct {
|
|
conn *grpc.ClientConn
|
|
raw pb.GalaxyRepositoryClient
|
|
opts Options
|
|
}
|
|
|
|
// DialGalaxy opens a gRPC connection to the gateway for the Galaxy Repository
|
|
// service. It applies the same authentication metadata, transport security,
|
|
// lazy connection, and DialTimeout-bounded readiness probe as Dial.
|
|
func DialGalaxy(ctx context.Context, opts Options) (*GalaxyClient, error) {
|
|
conn, err := dial(ctx, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewGalaxyClient(conn, opts), nil
|
|
}
|
|
|
|
// NewGalaxyClient wraps an existing gRPC connection for Galaxy Repository
|
|
// access. The caller owns closing conn unless it calls Close on the returned
|
|
// GalaxyClient.
|
|
func NewGalaxyClient(conn *grpc.ClientConn, opts Options) *GalaxyClient {
|
|
return &GalaxyClient{
|
|
conn: conn,
|
|
raw: pb.NewGalaxyRepositoryClient(conn),
|
|
opts: opts,
|
|
}
|
|
}
|
|
|
|
// RawClient returns the generated gRPC client for command-specific parity
|
|
// tests.
|
|
func (c *GalaxyClient) RawClient() RawGalaxyRepositoryClient {
|
|
return c.raw
|
|
}
|
|
|
|
// TestConnection probes the Galaxy Repository service. It returns the server's
|
|
// reported ok flag and a non-nil error only when the RPC itself fails.
|
|
func (c *GalaxyClient) TestConnection(ctx context.Context) (bool, error) {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.TestConnection(callCtx, &pb.TestConnectionRequest{})
|
|
if err != nil {
|
|
return false, &GatewayError{Op: "galaxy test connection", Err: err}
|
|
}
|
|
return reply.GetOk(), nil
|
|
}
|
|
|
|
// GetLastDeployTime returns the Galaxy's last deploy timestamp. When the server
|
|
// reports present=false (no deploy recorded yet) the call returns
|
|
// (time.Time{}, false, nil). When present=true the timestamp is returned in
|
|
// UTC with present=true.
|
|
func (c *GalaxyClient) GetLastDeployTime(ctx context.Context) (time.Time, bool, error) {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.GetLastDeployTime(callCtx, &pb.GetLastDeployTimeRequest{})
|
|
if err != nil {
|
|
return time.Time{}, false, &GatewayError{Op: "galaxy get last deploy time", Err: err}
|
|
}
|
|
if !reply.GetPresent() {
|
|
return time.Time{}, false, nil
|
|
}
|
|
ts := reply.GetTimeOfLastDeploy()
|
|
if ts == nil {
|
|
return time.Time{}, false, nil
|
|
}
|
|
return ts.AsTime(), true, nil
|
|
}
|
|
|
|
// DiscoverHierarchy returns the deployed Galaxy object hierarchy with each
|
|
// object's dynamic attributes. The objects are returned in the order supplied
|
|
// by the server.
|
|
func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{})
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
|
|
}
|
|
return reply.GetObjects(), nil
|
|
}
|
|
|
|
// WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers
|
|
// that want direct control over Recv. The caller owns the returned stream's
|
|
// lifetime via ctx cancellation.
|
|
func (c *GalaxyClient) WatchDeployEventsRaw(ctx context.Context, req *WatchDeployEventsRequest) (RawDeployEventStream, error) {
|
|
if req == nil {
|
|
req = &pb.WatchDeployEventsRequest{}
|
|
}
|
|
|
|
stream, err := c.raw.WatchDeployEvents(ctx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "galaxy watch deploy events", Err: err}
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
// WatchDeployEvents subscribes to Galaxy deploy events. The server emits a
|
|
// bootstrap event with the current state immediately on subscribe, then one
|
|
// event per new deploy. When lastSeenDeployTime is non-nil it is forwarded to
|
|
// the server to suppress the bootstrap event.
|
|
//
|
|
// The returned event channel is closed when the server completes the stream
|
|
// (io.EOF), when ctx is cancelled, or after a terminal error has been
|
|
// delivered on the error channel. The error channel is also closed once the
|
|
// stream tears down. Surfaced errors are wrapped in *GatewayError.
|
|
//
|
|
// Cancel ctx to tear the stream down cleanly.
|
|
func (c *GalaxyClient) WatchDeployEvents(
|
|
ctx context.Context,
|
|
lastSeenDeployTime *time.Time,
|
|
) (<-chan *DeployEvent, <-chan error, error) {
|
|
req := &pb.WatchDeployEventsRequest{}
|
|
if lastSeenDeployTime != nil {
|
|
req.LastSeenDeployTime = timestamppb.New(*lastSeenDeployTime)
|
|
}
|
|
|
|
stream, err := c.WatchDeployEventsRaw(ctx, req)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
events := make(chan *DeployEvent, 16)
|
|
errs := make(chan error, 1)
|
|
go func() {
|
|
defer close(events)
|
|
defer close(errs)
|
|
for {
|
|
event, recvErr := stream.Recv()
|
|
if recvErr == nil {
|
|
select {
|
|
case events <- event:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
if errors.Is(recvErr, io.EOF) {
|
|
return
|
|
}
|
|
if status.Code(recvErr) == codes.Canceled || ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case errs <- &GatewayError{Op: "galaxy watch deploy events", Err: recvErr}:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
|
|
return events, errs, nil
|
|
}
|
|
|
|
// Close closes the underlying gRPC connection.
|
|
func (c *GalaxyClient) Close() error {
|
|
if c == nil || c.conn == nil {
|
|
return nil
|
|
}
|
|
return c.conn.Close()
|
|
}
|
|
|
|
func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
|
return callContext(ctx, c.opts.CallTimeout)
|
|
}
|