Code-review 2026-05-20 sweep: re-review at 1cd51bb, resolve 72 findings across all 11 modules

Re-reviewed every module/client against the 10-category checklist
(REVIEW-PROCESS.md) at commit 1cd51bb, filed 72 new findings, and
fixed them in three priority waves (3 High, 17 Medium, 52 Low).

Highs
- Server-017: enumerate AcknowledgeAlarm / QueryActiveAlarms in
  GatewayGrpcScopeResolver so non-admin keys can use them; document
  the mapping in docs/Authorization.md; add interceptor tests.
- Client.Java-013: add the five missing bulk-method stubs to the
  CLI FakeSession so the test module compiles on a clean tree.
- Client.Rust-013: fix the clippy::doc_lazy_continuation regression
  in generated tonic code by reformatting the ReadBulkCommand proto
  comment and scoping a #![allow(...)] to the generated submodules.

Mediums (highlights)
- Server: unify GatewaySession state-lock discipline (-015) and
  make DisposeAsync race-safe against in-flight CloseAsync (-016);
  add constraint-enforcement test coverage for the bulk-plan path
  (-021).
- Worker: introduce StaRuntimeShutdownException so RunAlarmPollLoop
  can distinguish graceful shutdown from a real STA-affinity
  violation (-016); have the watchdog skip StaHung while
  CurrentCommandCorrelationId is non-empty so a legitimate slow
  ReadBulk no longer self-faults (-017).
- Tests: add per-method round-trip + cancellation coverage for the
  11 GatewaySession bulk methods (-013); replace the real TCP probe
  in GalaxyHierarchyCacheTests with an IGalaxyRepository fake
  (-016).
- IntegrationTests: drive the StreamEvents writer in the live Write
  test and assert OnWriteComplete (-012); add live tests for
  Unadvise/RemoveItem/Unregister ordering, WriteSecured, and
  abnormal worker exit (-014).
- Worker.Tests: replace MxAccessSession reflection with an internal
  CreateForTesting factory (-016); cover WorkerCancel and
  unexpected-body envelope branches (-017).
- Client.Java: cancel MxEventStream when close() races
  beforeStart() (-014); return a CancellingCompletableFuture that
  actually forwards cancellation through .thenApply chains (-015).
- Client.Python: drop the silent localhost-plaintext downgrade in
  the CLI; require explicit --plaintext (-013).
- Client.Rust: stop bench-read-bulk from polluting success-latency
  histograms with failed-call durations (-015); add coverage for
  the five MalformedReply paths, the bulk-write helpers, the
  Error::Unavailable mapping, and the unary-fault path (-016).
- Contracts: extend docs/Contracts.md with the bulk read/write
  command family (-009).

Lows (highlights)
- Server: cap GalaxyGlobMatcher.RegexCache; align
  WorkerAlarmRpcDispatcher missing-session handling; drop the
  duplicate dashboard @page routes; refresh IAlarmRpcDispatcher
  XML doc.
- Worker: surface SetXmlAlarmQuery COM failures; remove dead
  subscriptionExpression / ExecutingCommand arms; preserve
  factory-supplied runtime sessions; split MxAlarmSnapshot.cs into
  three files.
- Tests: dispose the WebApplication in seven test classes; rebuild
  FakeWorkerProcess.WaitForExitAsync against a real TaskCompletion
  source; switch the heartbeat-expires test to ManualTimeProvider;
  add InvariantCulture to the remaining DateTimeOffset.Parse sites;
  document GalaxyFilterInputSafetyTests in GatewayTesting.md.
- IntegrationTests: comment fixes, RecordingServerStreamWriter
  IDisposable, class-level [Trait], single-source ZB default
  connection string.
- Worker.Tests: replace silent-return gating with LiveMxAccessFact
  so absent env vars SKIP not pass; PascalCase rename of probe
  [Fact]s; deterministic deadline test; new frame-protocol error
  tests; ComputeTransitions diff-coverage; relocate dev-rig probes
  to Probes/.
- Contracts: add round-trip coverage and per-field redaction /
  Galaxy-identifier comments to the protos.
- Client.Dotnet: introduce clients/dotnet/Directory.Build.props so
  TreatWarningsAsErrors / analysers apply; document
  DiscoverHierarchyOptions and IMxGatewayCliClient; require typed
  bulk-read handles in CLI; surface AcknowledgeAlarm transport
  faults through Translate().
- Client.Go: kill dead code in alarms_test / fakeGalaxyServer /
  runWriteBulkVariant; document the six new subcommands in
  writeUsage; drain galaxy-watch events on limit; switch io.EOF
  comparisons to errors.Is.
- Client.Java: shared shutdown helpers + new shutdownTimeout
  option; regex-based credential redaction; Long.toUnsignedString
  for uint64 sequence; doc fixes.
- Client.Python: combine duplicate imports; add coverage for
  _percentile / bench-read-bulk / MAX_AGGREGATE_EVENTS /
  _api_key_from_env; populate pyproject metadata and ship py.typed.
- Client.Rust: expose next_correlation_id() so CLI ping/close
  stop hard-coding correlation IDs; resync RustClientDesign.md
  with the current Session / Error surface and CLI subcommand set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-20 09:46:47 -04:00
parent 1cd51bbda3
commit a0203503a7
122 changed files with 8723 additions and 757 deletions
+17
View File
@@ -0,0 +1,17 @@
<Project>
<!--
Mirrors src/Directory.Build.props for the .NET client projects under
clients/dotnet/ so they share the same enforcement floor (warnings-as-
errors, latest analyzers, code-style enforcement, deterministic builds)
even though they live outside src/.
-->
<PropertyGroup>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AnalysisLevel>latest</AnalysisLevel>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
<Deterministic>true</Deterministic>
</PropertyGroup>
</Project>
@@ -3,6 +3,12 @@ using MxGateway.Contracts.Proto.Galaxy;
namespace MxGateway.Client.Cli;
/// <summary>
/// Minimal transport surface the CLI talks to. Exposes only the gateway and
/// Galaxy Repository RPCs the CLI needs so tests can substitute an in-process
/// fake without standing up a real gRPC channel. The production binding is a
/// thin adapter over <see cref="MxGatewayClient"/> and <see cref="GalaxyRepositoryClient"/>.
/// </summary>
public interface IMxGatewayCliClient : IAsyncDisposable
{
/// <summary>
@@ -635,7 +635,7 @@ public static class MxGatewayClientCli
}),
cancellationToken)
.ConfigureAwait(false);
int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value;
int serverHandle = RequireRegisterServerHandle(registerReply);
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
subscribe.TagAddresses.Add(tags);
@@ -893,7 +893,7 @@ public static class MxGatewayClientCli
}),
cancellationToken)
.ConfigureAwait(false);
int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value;
int serverHandle = RequireRegisterServerHandle(registerReply);
SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle };
subscribe.TagAddresses.Add(tags);
@@ -941,11 +941,16 @@ public static class MxGatewayClientCli
continue;
}
if (firstSteadyEventUtc is null)
// Guarded by latencyLock so parallel sessions can't tear a 64-bit
// DateTime? read or stomp an already-set firstSteadyEventUtc with
// a later timestamp from a slower-to-start session. The lock is
// already held by the latency append a few lines below, so the
// extra cost is one uncontended lock acquisition per event.
lock (latencyLock)
{
firstSteadyEventUtc = nowUtc;
firstSteadyEventUtc ??= nowUtc;
lastSteadyEventUtc = nowUtc;
}
lastSteadyEventUtc = nowUtc;
Interlocked.Increment(ref steadyEvents);
if (mxEvent.Family == MxEventFamily.OnDataChange)
{
@@ -1258,7 +1263,7 @@ public static class MxGatewayClientCli
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-smoke" },
},
reply => reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value,
RequireRegisterServerHandle,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
@@ -1276,7 +1281,7 @@ public static class MxGatewayClientCli
ItemDefinition = arguments.GetRequired("item"),
},
},
reply => reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value,
RequireAddItemItemHandle,
commandReplies,
cancellationToken)
.ConfigureAwait(false);
@@ -1408,6 +1413,41 @@ public static class MxGatewayClientCli
return reply;
}
/// <summary>
/// Returns the server handle from a successful <c>register</c> reply, or throws
/// <see cref="MxGatewayException"/> when the typed <see cref="MxCommandReply.Register"/>
/// payload is absent. Mirrors the SDK-level <see cref="MxGatewaySession.RegisterAsync"/>
/// contract: a successful reply without the typed payload is a gateway protocol
/// error, not a license to fall through to <c>ReturnValue.Int32Value</c> (which is 0
/// when the reply carries no return value).
/// </summary>
private static int RequireRegisterServerHandle(MxCommandReply reply)
{
return reply.Register?.ServerHandle
?? throw CreateMissingPayloadException(reply, "register");
}
/// <summary>
/// Returns the item handle from a successful <c>add_item</c> reply, or throws
/// <see cref="MxGatewayException"/> when the typed <see cref="MxCommandReply.AddItem"/>
/// payload is absent. See <see cref="RequireRegisterServerHandle"/> for the rationale.
/// </summary>
private static int RequireAddItemItemHandle(MxCommandReply reply)
{
return reply.AddItem?.ItemHandle
?? throw CreateMissingPayloadException(reply, "add_item");
}
private static MxGatewayException CreateMissingPayloadException(
MxCommandReply reply,
string expectedPayload)
{
return new MxGatewayException(
$"Gateway reply for command kind={reply.Kind} reported success but is missing "
+ $"the required '{expectedPayload}' payload; cannot resolve a handle. "
+ $"session={reply.SessionId}; correlation={reply.CorrelationId}");
}
private static MxCommandRequest CreateCommandRequest(
string sessionId,
MxCommand command)
@@ -216,7 +216,7 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
AcknowledgeAlarmCalls.Add((request, callOptions));
if (AcknowledgeAlarmExceptions.TryDequeue(out Exception? exception))
{
throw exception;
throw Translate(exception, callOptions);
}
return Task.FromResult(_acknowledgeReplies.Count > 0
@@ -73,19 +73,17 @@ public sealed class MxGatewayClientAlarmsTests
}
[Fact]
public async Task AcknowledgeAlarmAsync_MapsUnauthenticated_RpcException_ToTypedException()
public async Task AcknowledgeAlarmAsync_SurfacesRpcExceptionFromFakeTransportVerbatim_WhenMappingDisabled()
{
// Default FakeGatewayTransport.MapTransportExceptions is false, matching the
// historical pass-through shape: a thrown RpcException reaches the caller as
// RpcException rather than being mapped to a typed MxGatewayException. This
// test pins that shape so a future change can't silently flip it.
FakeGatewayTransport transport = CreateTransport();
transport.AcknowledgeAlarmExceptions.Enqueue(
new RpcException(new Status(StatusCode.Unauthenticated, "expired key")));
await using MxGatewayClient client = CreateClient(transport);
// Note: the FakeGatewayTransport surfaces RpcException directly (it does not run
// through GrpcMxGatewayClientTransport's mapping); the fake's contract here is to
// pass the exception verbatim. RpcException → typed exception mapping is covered
// in the GrpcMxGatewayClientTransport-level tests; the SDK-level test pins the
// pass-through shape so a future migration to direct mapping won't silently
// change observable behaviour.
var ex = await Assert.ThrowsAsync<RpcException>(
() => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest
{
@@ -97,6 +95,32 @@ public sealed class MxGatewayClientAlarmsTests
Assert.Equal(StatusCode.Unauthenticated, ex.StatusCode);
}
[Fact]
public async Task AcknowledgeAlarmAsync_MapsUnauthenticated_RpcException_ToTypedException()
{
// Production parity: GrpcMxGatewayClientTransport.AcknowledgeAlarmAsync runs
// every thrown RpcException through RpcExceptionMapper.Map, so callers see
// MxGatewayAuthenticationException (for Unauthenticated) rather than the raw
// RpcException. The fake transport reproduces that mapping when
// MapTransportExceptions is set, letting this SDK-level test cover the same
// observable behaviour without standing up a real gRPC channel.
FakeGatewayTransport transport = CreateTransport();
transport.MapTransportExceptions = true;
transport.AcknowledgeAlarmExceptions.Enqueue(
new RpcException(new Status(StatusCode.Unauthenticated, "expired key")));
await using MxGatewayClient client = CreateClient(transport);
var ex = await Assert.ThrowsAsync<MxGatewayAuthenticationException>(
() => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest
{
SessionId = "session-fixture",
AlarmFullReference = "Tank01.Level.HiHi",
Comment = string.Empty,
OperatorUser = "alice",
}));
Assert.Equal(StatusCode.Unauthenticated, ex.StatusCode);
}
[Fact]
public async Task QueryActiveAlarmsAsync_StreamsEnqueuedSnapshots()
{
@@ -1,24 +1,67 @@
namespace MxGateway.Client;
/// <summary>
/// Server-side filters and shape options for
/// <see cref="GalaxyRepositoryClient.DiscoverHierarchyAsync(DiscoverHierarchyOptions, System.Threading.CancellationToken)"/>.
/// Each property maps directly to the corresponding field on the
/// <c>DiscoverHierarchyRequest</c> proto so the gateway can narrow the
/// hierarchy walk before serializing it back to the client.
/// </summary>
public sealed record DiscoverHierarchyOptions
{
/// <summary>
/// Root Galaxy object id to start the walk from. When set, takes
/// precedence over <see cref="RootTagName"/> and <see cref="RootContainedPath"/>.
/// </summary>
public int? RootGobjectId { get; init; }
/// <summary>
/// Root tag (assigned) name to start the walk from. Used when
/// <see cref="RootGobjectId"/> is null.
/// </summary>
public string? RootTagName { get; init; }
/// <summary>
/// Root contained-name dotted path to start the walk from. Used when
/// neither <see cref="RootGobjectId"/> nor <see cref="RootTagName"/> are set.
/// </summary>
public string? RootContainedPath { get; init; }
/// <summary>
/// Maximum traversal depth below the root, inclusive. Leave null for the
/// server default (unbounded).
/// </summary>
public int? MaxDepth { get; init; }
/// <summary>
/// Galaxy category ids to include. Empty means all categories.
/// </summary>
public IReadOnlyList<int> CategoryIds { get; init; } = Array.Empty<int>();
/// <summary>
/// Template tag names that must appear somewhere in each returned
/// object's template chain. Empty means no template filter.
/// </summary>
public IReadOnlyList<string> TemplateChainContains { get; init; } = Array.Empty<string>();
/// <summary>
/// Optional glob (e.g. <c>"Tank*"</c>) matched against each object's tag name.
/// </summary>
public string? TagNameGlob { get; init; }
/// <summary>
/// When set, overrides whether each returned <c>GalaxyObject</c> includes
/// its dynamic attribute list. Leave null to use the server default.
/// </summary>
public bool? IncludeAttributes { get; init; }
/// <summary>
/// When true, restrict results to objects that bear at least one configured alarm.
/// </summary>
public bool AlarmBearingOnly { get; init; }
/// <summary>
/// When true, restrict results to objects that have at least one historized attribute.
/// </summary>
public bool HistorizedOnly { get; init; }
}
@@ -23,7 +23,7 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
private readonly GrpcChannel? _channel;
private readonly IGalaxyRepositoryClientTransport _transport;
private readonly ResiliencePipeline _safeUnaryRetryPipeline;
private bool _disposed;
private int _disposed;
/// <summary>
/// Initializes a Galaxy Repository client with custom transport and options.
@@ -182,6 +182,17 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
return await DiscoverHierarchyAsync(new DiscoverHierarchyOptions(), cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Enumerates the deployed Galaxy object hierarchy with caller-supplied
/// server-side filters. Each returned <see cref="GalaxyObject"/> may include
/// its dynamic attributes (controlled by <see cref="DiscoverHierarchyOptions.IncludeAttributes"/>),
/// so callers can determine which tag references they may subscribe to via
/// the MxAccessGateway service. The client transparently follows the
/// gateway's pagination cursor until the hierarchy is fully drained.
/// </summary>
/// <param name="options">Server-side filter and shape options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The filtered collection of Galaxy objects.</returns>
public async Task<IReadOnlyList<GalaxyObject>> DiscoverHierarchyAsync(
DiscoverHierarchyOptions options,
CancellationToken cancellationToken = default)
@@ -338,12 +349,11 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
/// </summary>
public ValueTask DisposeAsync()
{
if (_disposed)
if (Interlocked.Exchange(ref _disposed, 1) != 0)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_channel?.Dispose();
return ValueTask.CompletedTask;
}
@@ -444,6 +454,6 @@ public sealed class GalaxyRepositoryClient : IAsyncDisposable
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposed) != 0, this);
}
}
+23 -12
View File
@@ -389,25 +389,30 @@ func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) e
}
func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false)
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false)
}
func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false)
return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true)
}
func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true)
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false)
}
func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error {
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true)
return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true)
}
// runWriteBulkVariant shares the flag-parsing + entry-build skeleton across
// the four bulk-write families. withTimestamp adds a --timestamp-value flag;
// secured switches from --user-id to --current-user-id / --verifier-user-id.
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error {
// the four bulk-write families. command selects which of the four routes
// runs; withTimestamp adds a --timestamp-value flag for the Write2 / Secured2
// variants. Secured-only flags (--current-user-id / --verifier-user-id) are
// only registered for the secured variants and the non-secured -user-id flag
// is only registered for Write/Write2, so a wrong-variant flag becomes a
// clean "flag provided but not defined" error instead of silently no-op'ing.
func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool) error {
secured := command == "write-secured-bulk" || command == "write-secured2-bulk"
flags := flag.NewFlagSet(command, flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
@@ -417,9 +422,13 @@ func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.W
itemHandles := flags.String("item-handles", "", "comma-separated item handles")
valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string")
values := flags.String("values", "", "comma-separated values (one per item handle)")
userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
var userID, currentUserID, verifierUserID *int
if secured {
currentUserID = flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)")
verifierUserID = flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)")
} else {
userID = flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)")
}
timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)")
if err := flags.Parse(args); err != nil {
@@ -507,7 +516,6 @@ func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.W
default:
return fmt.Errorf("unsupported bulk write command %q", command)
}
_ = secured // currently only used for routing above; reserved for future per-variant validation
return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err)
}
@@ -1061,7 +1069,7 @@ type protojsonMessage interface {
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch>")
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch>")
}
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
@@ -1245,6 +1253,9 @@ func runGalaxyWatch(ctx context.Context, args []string, stdout, stderr io.Writer
count++
if *limit > 0 && count >= *limit {
cancelStream()
// Allow goroutine to drain.
for range events {
}
return nil
}
case streamErr, ok := <-errs:
-4
View File
@@ -64,10 +64,6 @@ func TestAcknowledgeAlarmRejectsNilRequest(t *testing.T) {
defer cleanup()
_, err := client.AcknowledgeAlarm(context.Background(), nil)
if err == nil || !errors.Is(err, errors.Unwrap(err)) && err.Error() != "mxgateway: acknowledge alarm request is required" {
// Accept either: the helper returned the literal sentinel, or the
// generic transport error — both prove nil was rejected.
}
if err == nil {
t.Fatalf("AcknowledgeAlarm(nil) returned no error")
}
+2 -1
View File
@@ -2,6 +2,7 @@ package mxgateway
import (
"context"
"errors"
"io"
"time"
@@ -186,7 +187,7 @@ func (c *GalaxyClient) WatchDeployEvents(
}
continue
}
if recvErr == io.EOF {
if errors.Is(recvErr, io.EOF) {
return
}
if status.Code(recvErr) == codes.Canceled || ctx.Err() != nil {
+8 -16
View File
@@ -372,15 +372,14 @@ func newGalaxyBufconnClient(t *testing.T, fake *fakeGalaxyServer) (*GalaxyClient
type fakeGalaxyServer struct {
pb.UnimplementedGalaxyRepositoryServer
testReply *pb.TestConnectionReply
testAuth string
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchSendInterval time.Duration
watchHoldOpen bool
testReply *pb.TestConnectionReply
testAuth string
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchHoldOpen bool
}
func (s *fakeGalaxyServer) TestConnection(ctx context.Context, req *pb.TestConnectionRequest) (*pb.TestConnectionReply, error) {
@@ -414,13 +413,6 @@ func (s *fakeGalaxyServer) WatchDeployEvents(req *pb.WatchDeployEventsRequest, s
if err := stream.Send(event); err != nil {
return err
}
if s.watchSendInterval > 0 {
select {
case <-time.After(s.watchSendInterval):
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
if s.watchHoldOpen {
<-stream.Context().Done()
+1 -1
View File
@@ -599,7 +599,7 @@ func (s *Session) subscribeEventsAfter(ctx context.Context, afterWorkerSequence
}
continue
}
if err == io.EOF || status.Code(err) == codes.Canceled || streamCtx.Err() != nil {
if errors.Is(err, io.EOF) || status.Code(err) == codes.Canceled || streamCtx.Err() != nil {
return
}
sendEventResult(
+19 -8
View File
@@ -88,8 +88,9 @@ observe the close result or handle a close-time failure.
`MxGatewayClient` and `GalaxyRepositoryClient` implement `AutoCloseable`. For a
client that owns its channel (built with `connect`), the try-with-resources
`close()` shuts the channel down and waits up to the configured connect timeout
for termination, forcibly shutting it down on timeout, so in-flight calls and
`close()` shuts the channel down and waits up to the configured
`shutdownTimeout` (default 10 s, independent of `connectTimeout`) for
termination, forcibly shutting it down on timeout, so in-flight calls and
Netty event-loop threads are not left running after the block exits. If the
calling thread is interrupted while waiting, the channel is forcibly shut down
and the interrupt flag is restored. `closeAndAwaitTermination()` does the same
@@ -99,12 +100,22 @@ blocking-aware shutdown. `close()` is a no-op for a caller-managed channel.
`MxEventStream` implements `Iterator<MxEvent>` and `AutoCloseable`. Closing it
cancels the underlying gRPC stream. Canceling or timing out a Java client call
only stops the client from waiting; it does not abort an in-flight MXAccess COM
call on the worker STA. The event stream uses gRPC's default auto-inbound flow
control with a fixed 16-element buffer and no client-side flow control: this is
the gateway's documented fail-fast event-backpressure model, so a consumer that
stalls long enough to fill the buffer triggers an overflow that cancels the
subscription and surfaces an `MxGatewayException` from the next `next()` call.
Drain events promptly and be prepared to resubscribe with a resume cursor.
call on the worker STA. Closing an `MxEventStream` *before* the gRPC call has
attached its observer (a real race when callers cancel immediately after
subscribing) is safe — the close is replayed in the observer's `beforeStart`
and the underlying call is cancelled, matching `DeployEventStream` behaviour.
The event stream uses gRPC's default auto-inbound flow control with a fixed
1024-element buffer and no client-side flow control: this is the gateway's
documented fail-fast event-backpressure model, so a consumer that stalls long
enough to fill the buffer triggers an overflow that cancels the subscription
and surfaces an `MxGatewayException` from the next `next()` call. Drain events
promptly and be prepared to resubscribe with a resume cursor.
Cancellation of `CompletableFuture` results from `openSessionAsync`,
`invokeAsync`, `acknowledgeAlarmAsync`, `getLastDeployTimeAsync`,
`testConnectionAsync`, and `discoverHierarchyAsync` forwards to the underlying
gRPC call: calling `cancel(true)` on the returned future aborts the in-flight
RPC instead of merely detaching the future from its result.
## Galaxy Repository Browse
@@ -242,9 +242,12 @@ public final class MxGatewayCli implements Callable<Integer> {
if (json) {
out.println(protoJson(event));
} else {
// sequence is a proto uint64 — print as unsigned so values
// past 2^63 do not render as negative signed longs. JSON
// path goes through JsonFormat which already does this.
out.printf(
"seq=%d observed=%s deployTime=%s objects=%d attributes=%d%n",
event.getSequence(),
"seq=%s observed=%s deployTime=%s objects=%d attributes=%d%n",
Long.toUnsignedString(event.getSequence()),
formatTimestamp(event.getObservedAt()),
event.getTimeOfLastDeployPresent()
? formatTimestamp(event.getTimeOfLastDeploy())
@@ -9,6 +9,8 @@ import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
@@ -22,6 +24,10 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecured2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteSecuredBulkEntry;
import org.junit.jupiter.api.Test;
final class MxGatewayCliTests {
@@ -124,6 +130,25 @@ final class MxGatewayCliTests {
assertTrue(run.output().contains("\"tagAddress\":\"TestMachine_002.TestChangingInt\""));
}
@Test
void deployEventSequenceRendersAsUnsignedForHighUint64() {
// Client.Java-020 regression: galaxy-watch text output now uses
// Long.toUnsignedString to format the proto uint64 sequence field, so
// values past 2^63 render as positive decimal strings instead of the
// negative signed-long interpretation the old "%d" produced.
long highUnsigned = -1L; // bit-pattern for 2^64 - 1, i.e. 18446744073709551615 unsigned
String text = String.format(
"seq=%s observed=%s deployTime=%s objects=%d attributes=%d",
Long.toUnsignedString(highUnsigned),
"2026-05-20T00:00:00Z",
"(none)",
0,
0);
assertTrue(text.contains("seq=18446744073709551615"), "expected unsigned rendering, got: " + text);
assertFalse(text.contains("seq=-1"), "must not render as signed -1");
}
@Test
void unsubscribeBulkCommandPrintsResults() {
CliRun run = execute(
@@ -297,6 +322,31 @@ final class MxGatewayCliTests {
return results;
}
@Override
public List<BulkReadResult> readBulk(int serverHandle, List<String> items, int timeoutMs) {
return new ArrayList<>();
}
@Override
public List<BulkWriteResult> writeBulk(int serverHandle, List<WriteBulkEntry> entries) {
return new ArrayList<>();
}
@Override
public List<BulkWriteResult> write2Bulk(int serverHandle, List<Write2BulkEntry> entries) {
return new ArrayList<>();
}
@Override
public List<BulkWriteResult> writeSecuredBulk(int serverHandle, List<WriteSecuredBulkEntry> entries) {
return new ArrayList<>();
}
@Override
public List<BulkWriteResult> writeSecured2Bulk(int serverHandle, List<WriteSecured2BulkEntry> entries) {
return new ArrayList<>();
}
@Override
public com.dohertylan.mxgateway.client.MxEventStream streamEventsAfter(long afterWorkerSequence) {
throw new UnsupportedOperationException("stream-events is covered by client tests");
@@ -21,7 +21,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Thin wrapper around the generated {@link GalaxyRepositoryGrpc} stubs that
@@ -128,10 +128,14 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
* exceptionally with {@link MxGatewayException} on failure
*/
public CompletableFuture<Boolean> testConnectionAsync() {
// Apply the projection inside toCompletable rather than via .thenApply
// so the user-visible future is the same future cancellation is bound
// to; a downstream .thenApply stage would not forward cancel() to the
// source RPC.
return MxGatewayChannels.toCompletable(
rawFutureStub().testConnection(TestConnectionRequest.getDefaultInstance()),
"galaxy test connection")
.thenApply(TestConnectionReply::getOk);
rawFutureStub().testConnection(TestConnectionRequest.getDefaultInstance()),
"galaxy test connection",
TestConnectionReply::getOk);
}
/**
@@ -163,10 +167,9 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
*/
public CompletableFuture<Optional<Instant>> getLastDeployTimeAsync() {
return MxGatewayChannels.toCompletable(
rawFutureStub().getLastDeployTime(GetLastDeployTimeRequest.getDefaultInstance()),
"galaxy get last deploy time")
.thenApply(MxGatewayChannels.normalisingValidator(
"galaxy get last deploy time", GalaxyRepositoryClient::mapDeployTime));
rawFutureStub().getLastDeployTime(GetLastDeployTimeRequest.getDefaultInstance()),
"galaxy get last deploy time",
GalaxyRepositoryClient::mapDeployTime);
}
/**
@@ -210,7 +213,33 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
* exceptionally with {@link MxGatewayException} on failure
*/
public CompletableFuture<List<GalaxyObject>> discoverHierarchyAsync() {
return discoverHierarchyPageAsync("", new java.util.ArrayList<>(), new java.util.HashSet<>());
// The recursive page chain produces a fresh in-flight RPC per page.
// Track the current in-flight stage in an AtomicReference and return a
// user-facing future whose cancel() forwards to that current stage
// otherwise cancelling the chained CompletableFuture would not abort
// the in-flight gRPC call. Without this, .thenCompose creates new
// stages whose cancel() does not propagate upstream.
AtomicReference<CompletableFuture<?>> currentStage = new AtomicReference<>();
CompletableFuture<List<GalaxyObject>> userFuture = new CompletableFuture<>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
CompletableFuture<?> stage = currentStage.get();
if (stage != null) {
stage.cancel(mayInterruptIfRunning);
}
return cancelled;
}
};
discoverHierarchyPageAsync("", new java.util.ArrayList<>(), new java.util.HashSet<>(), currentStage)
.whenComplete((result, error) -> {
if (error != null) {
userFuture.completeExceptionally(error);
} else {
userFuture.complete(result);
}
});
return userFuture;
}
/**
@@ -275,43 +304,30 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
* callers do not leave in-flight calls or Netty event-loop threads running
* after the block exits.
*
* <p>Waits up to the configured connect timeout for graceful termination
* and forcibly shuts the channel down on timeout. If the calling thread is
* interrupted while waiting, the channel is forcibly shut down and the
* thread's interrupt flag is restored. No-op for clients that do not own
* their channel. For an explicitly checked, blocking-aware shutdown call
* {@link #closeAndAwaitTermination()}.
* <p>Waits up to {@link MxGatewayClientOptions#shutdownTimeout()} for
* graceful termination and forcibly shuts the channel down on timeout. If
* the calling thread is interrupted while waiting, the channel is forcibly
* shut down and the thread's interrupt flag is restored. No-op for clients
* that do not own their channel. For an explicitly checked, blocking-aware
* shutdown call {@link #closeAndAwaitTermination()}. Delegates to the
* shared {@link MxGatewayChannels#shutdown} so behavior stays in lockstep
* with {@link MxGatewayClient}.
*/
@Override
public void close() {
if (ownedChannel == null) {
return;
}
ownedChannel.shutdown();
try {
if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
ownedChannel.shutdownNow();
}
} catch (InterruptedException error) {
ownedChannel.shutdownNow();
Thread.currentThread().interrupt();
}
MxGatewayChannels.shutdown(ownedChannel, options);
}
/**
* Shuts the owned channel down and waits up to the configured connect
* timeout for termination, forcibly shutting it down on timeout. No-op
* for clients that do not own their channel.
* Shuts the owned channel down and waits up to
* {@link MxGatewayClientOptions#shutdownTimeout()} for termination,
* forcibly shutting it down on timeout. No-op for clients that do not own
* their channel.
*
* @throws InterruptedException if the calling thread is interrupted while waiting
*/
public void closeAndAwaitTermination() throws InterruptedException {
if (ownedChannel != null) {
ownedChannel.shutdown();
if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
ownedChannel.shutdownNow();
}
}
MxGatewayChannels.shutdownAndAwaitTermination(ownedChannel, options);
}
private static Optional<Instant> mapDeployTime(GetLastDeployTimeReply reply) {
@@ -323,25 +339,33 @@ public final class GalaxyRepositoryClient implements AutoCloseable {
}
private CompletableFuture<List<GalaxyObject>> discoverHierarchyPageAsync(
String pageToken, java.util.ArrayList<GalaxyObject> objects, java.util.HashSet<String> seenPageTokens) {
String pageToken,
java.util.ArrayList<GalaxyObject> objects,
java.util.HashSet<String> seenPageTokens,
AtomicReference<CompletableFuture<?>> currentStage) {
DiscoverHierarchyRequest request = DiscoverHierarchyRequest.newBuilder()
.setPageSize(DISCOVER_HIERARCHY_PAGE_SIZE)
.setPageToken(pageToken)
.build();
return MxGatewayChannels.toCompletable(rawFutureStub().discoverHierarchy(request), "galaxy discover hierarchy")
.thenCompose(reply -> {
objects.addAll(reply.getObjectsList());
if (reply.getNextPageToken().isBlank()) {
return CompletableFuture.completedFuture(objects);
}
if (!seenPageTokens.add(reply.getNextPageToken())) {
CompletableFuture<List<GalaxyObject>> failed = new CompletableFuture<>();
failed.completeExceptionally(new MxGatewayException(
"galaxy discover hierarchy returned repeated page token: "
+ reply.getNextPageToken()));
return failed;
}
return discoverHierarchyPageAsync(reply.getNextPageToken(), objects, seenPageTokens);
});
CompletableFuture<DiscoverHierarchyReply> pageFuture = MxGatewayChannels.toCompletable(
rawFutureStub().discoverHierarchy(request), "galaxy discover hierarchy");
// Publish the in-flight page future so a user cancellation can abort
// the current outstanding RPC (the recursion replaces this reference
// before each subsequent page).
currentStage.set(pageFuture);
return pageFuture.thenCompose(reply -> {
objects.addAll(reply.getObjectsList());
if (reply.getNextPageToken().isBlank()) {
return CompletableFuture.completedFuture(objects);
}
if (!seenPageTokens.add(reply.getNextPageToken())) {
CompletableFuture<List<GalaxyObject>> failed = new CompletableFuture<>();
failed.completeExceptionally(new MxGatewayException(
"galaxy discover hierarchy returned repeated page token: "
+ reply.getNextPageToken()));
return failed;
}
return discoverHierarchyPageAsync(reply.getNextPageToken(), objects, seenPageTokens, currentStage);
});
}
}
@@ -25,14 +25,17 @@ import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
* <p><strong>Backpressure (fail-fast):</strong> this adaptor relies on gRPC's
* default auto-inbound flow control the async stub auto-requests messages, so
* the gateway can push events faster than the consumer drains the bounded
* 16-element buffer. There is intentionally <em>no</em> real client flow
* control: a consumer that stalls long enough to let the buffer fill triggers
* an immediate overflow that cancels the subscription and surfaces an
* {@link MxGatewayException} on the next {@link #next()} call. This matches the
* gateway's documented fail-fast event-backpressure design a slow consumer
* loses its subscription rather than silently dropping events. Consumers that
* cannot keep up must drain {@link #next()} promptly (e.g. hand events to their
* own larger queue) and be prepared to resubscribe with a resume cursor.
* 1024-element buffer (the buffer capacity is a constructor parameter; the
* production caller {@code MxGatewayClient.streamEvents} passes {@code 1024} to
* absorb the gateway's session-backlog replay burst). There is intentionally
* <em>no</em> real client flow control: a consumer that stalls long enough to
* let the buffer fill triggers an immediate overflow that cancels the
* subscription and surfaces an {@link MxGatewayException} on the next
* {@link #next()} call. This matches the gateway's documented fail-fast
* event-backpressure design a slow consumer loses its subscription rather
* than silently dropping events. Consumers that cannot keep up must drain
* {@link #next()} promptly (e.g. hand events to their own larger queue) and be
* prepared to resubscribe with a resume cursor.
*
* <p><strong>Threading:</strong> the iterator methods ({@link #hasNext()} and
* {@link #next()}) are <em>not</em> thread-safe and must be driven by a single
@@ -60,7 +63,16 @@ public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> requestStream) {
// Resolve the close()/beforeStart() race the same way DeployEventStream does:
// store the request stream first, then check the close flag and cancel the
// call if a prior close() already fired. Without this, a close() that ran
// before the gRPC call attached its ClientCallStreamObserver would skip
// stream.cancel() (because requestStream is still null) and beforeStart()
// arriving afterwards would leak the underlying call open.
MxEventStream.this.requestStream = requestStream;
if (closed) {
requestStream.cancel("client cancelled event stream", null);
}
}
@Override
@@ -98,19 +98,86 @@ final class MxGatewayChannels {
return stub.withDeadlineAfter(options.streamTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
/**
* Shuts a client-owned channel down and waits up to the configured
* {@link MxGatewayClientOptions#shutdownTimeout()} for graceful
* termination, forcing {@code shutdownNow()} on timeout. If the calling
* thread is interrupted while waiting, the channel is forcibly shut down
* and the thread's interrupt flag is restored this matches the
* try-with-resources {@code close()} contract that cannot throw a checked
* exception.
*
* <p>No-op when {@code ownedChannel} is {@code null} (i.e. the caller owns
* the channel lifecycle on a borrowed channel).
*
* @param ownedChannel the channel to shut down, may be {@code null}
* @param options the client options carrying the shutdown timeout
*/
static void shutdown(ManagedChannel ownedChannel, MxGatewayClientOptions options) {
if (ownedChannel == null) {
return;
}
ownedChannel.shutdown();
try {
if (!ownedChannel.awaitTermination(options.shutdownTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
ownedChannel.shutdownNow();
}
} catch (InterruptedException error) {
ownedChannel.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* Shuts a client-owned channel down and waits up to the configured
* {@link MxGatewayClientOptions#shutdownTimeout()} for termination,
* forcing {@code shutdownNow()} on timeout. Throws
* {@link InterruptedException} when the calling thread is interrupted
* for callers that want a checked, blocking-aware shutdown.
*
* <p>No-op when {@code ownedChannel} is {@code null}.
*
* @param ownedChannel the channel to shut down, may be {@code null}
* @param options the client options carrying the shutdown timeout
* @throws InterruptedException if the calling thread is interrupted while waiting
*/
static void shutdownAndAwaitTermination(ManagedChannel ownedChannel, MxGatewayClientOptions options)
throws InterruptedException {
if (ownedChannel == null) {
return;
}
ownedChannel.shutdown();
if (!ownedChannel.awaitTermination(options.shutdownTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
ownedChannel.shutdownNow();
}
}
/**
* Bridges a Guava {@link ListenableFuture} to a {@link CompletableFuture},
* normalising any failure through {@link MxGatewayErrors#fromGrpc} so the
* async error surface matches the synchronous methods. Cancelling the
* returned future cancels the source RPC.
*
* <p><strong>Cancellation contract:</strong> the returned future is a
* {@link CancellingCompletableFuture} that overrides
* {@link CompletableFuture#cancel(boolean)} so cancellation always forwards
* to the source {@link ListenableFuture}, even when callers wrap the
* future in additional {@code thenApply}/{@code thenCompose} stages. The
* historical {@code whenComplete}-based forwarder was buggy because
* {@code thenApply} returns a new {@code CompletableFuture} whose
* cancellation does <em>not</em> propagate back to this future; with the
* override-based design, calling {@code cancel(true)} on either the
* direct return value or the user-facing chained future ultimately
* invokes {@code source.cancel(true)} (chained futures forward to the
* upstream stage they were derived from, which is this future).
*
* @param source the gRPC future-stub result
* @param operation the operation name used in normalised error messages
* @param <T> the reply type
* @return a completable future mirroring the source
*/
static <T> CompletableFuture<T> toCompletable(ListenableFuture<T> source, String operation) {
CompletableFuture<T> target = new CompletableFuture<>();
CancellingCompletableFuture<T> target = new CancellingCompletableFuture<>(source);
Futures.addCallback(
source,
new FutureCallback<>() {
@@ -129,14 +196,83 @@ final class MxGatewayChannels {
}
},
MoreExecutors.directExecutor());
target.whenComplete((ignoredResult, ignoredError) -> {
if (target.isCancelled()) {
source.cancel(true);
}
});
return target;
}
/**
* Bridges a Guava {@link ListenableFuture} to a {@link CompletableFuture}
* and applies {@code validator} to the reply inline (i.e. without a
* downstream {@code thenApply}), so the user-visible future is the same
* future cancellation is bound to. Any non-{@link MxGatewayException}
* {@link RuntimeException} thrown by {@code validator} is routed through
* {@link MxGatewayErrors#fromGrpc} to match the synchronous error surface.
*
* <p>This overload exists because the prior {@code toCompletable(...)
* .thenApply(validator)} pattern broke cancellation propagation: the
* future returned by {@code thenApply} is a new stage whose cancellation
* does not propagate to the underlying gRPC call. Using this overload, the
* single returned future is the one users hold, so calling {@code cancel}
* on it forwards to the source RPC.
*
* @param source the gRPC future-stub result
* @param operation the operation name used in normalised error messages
* @param validator the validating/transforming function applied to the reply
* @param <T> the reply type
* @param <R> the validated/transformed result type
* @return a completable future mirroring the validated source
*/
static <T, R> CompletableFuture<R> toCompletable(
ListenableFuture<T> source, String operation, Function<T, R> validator) {
CancellingCompletableFuture<R> target = new CancellingCompletableFuture<>(source);
Futures.addCallback(
source,
new FutureCallback<>() {
@Override
public void onSuccess(T result) {
try {
target.complete(validator.apply(result));
} catch (MxGatewayException error) {
target.completeExceptionally(error);
} catch (RuntimeException error) {
target.completeExceptionally(MxGatewayErrors.fromGrpc(operation, error));
}
}
@Override
public void onFailure(Throwable error) {
if (error instanceof RuntimeException runtimeException) {
target.completeExceptionally(MxGatewayErrors.fromGrpc(operation, runtimeException));
return;
}
target.completeExceptionally(error);
}
},
MoreExecutors.directExecutor());
return target;
}
/**
* {@link CompletableFuture} subclass that forwards {@link #cancel(boolean)}
* to a backing {@link ListenableFuture}. Used by {@link #toCompletable} so
* cancelling the user-visible future cancels the underlying gRPC call.
*/
static final class CancellingCompletableFuture<T> extends CompletableFuture<T> {
private final ListenableFuture<?> source;
CancellingCompletableFuture(ListenableFuture<?> source) {
this.source = source;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
// Always forward; the source future is idempotent on cancel and the
// user contract is that cancelling the future cancels the RPC.
source.cancel(mayInterruptIfRunning);
return cancelled;
}
}
/**
* Adapts a reply-validating function for use inside {@code thenApply} so
* any non-{@link MxGatewayException} {@link RuntimeException} it raises is
@@ -7,7 +7,6 @@ import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
@@ -181,13 +180,16 @@ public final class MxGatewayClient implements AutoCloseable {
* with {@link MxGatewayException} on failure
*/
public CompletableFuture<OpenSessionReply> openSessionAsync(OpenSessionRequest request) {
CompletableFuture<OpenSessionReply> future =
MxGatewayChannels.toCompletable(rawFutureStub().openSession(request), "open session");
return future.thenApply(MxGatewayChannels.normalisingValidator("open session", reply -> {
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
ensureGatewayProtocolCompatible(reply);
return reply;
}));
// Apply the validator inside toCompletable rather than via .thenApply so
// cancellation on the returned future forwards to the source RPC (a
// .thenApply stage returns a fresh CompletableFuture whose cancel()
// does not propagate back to the upstream stage).
return MxGatewayChannels.toCompletable(
rawFutureStub().openSession(request), "open session", reply -> {
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
ensureGatewayProtocolCompatible(reply);
return reply;
});
}
/**
@@ -222,13 +224,11 @@ public final class MxGatewayClient implements AutoCloseable {
* on failure
*/
public CompletableFuture<MxCommandReply> invokeAsync(MxCommandRequest request) {
CompletableFuture<MxCommandReply> future =
MxGatewayChannels.toCompletable(rawFutureStub().invoke(request), "invoke");
return future.thenApply(MxGatewayChannels.normalisingValidator("invoke", reply -> {
return MxGatewayChannels.toCompletable(rawFutureStub().invoke(request), "invoke", reply -> {
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
MxGatewayErrors.ensureMxAccessSuccess("invoke", reply);
return reply;
}));
});
}
/**
@@ -320,12 +320,11 @@ public final class MxGatewayClient implements AutoCloseable {
* with {@link MxGatewayException} on failure
*/
public CompletableFuture<AcknowledgeAlarmReply> acknowledgeAlarmAsync(AcknowledgeAlarmRequest request) {
CompletableFuture<AcknowledgeAlarmReply> future =
MxGatewayChannels.toCompletable(rawFutureStub().acknowledgeAlarm(request), "acknowledge alarm");
return future.thenApply(MxGatewayChannels.normalisingValidator("acknowledge alarm", reply -> {
MxGatewayErrors.ensureProtocolSuccess("acknowledge alarm", reply.getProtocolStatus(), null);
return reply;
}));
return MxGatewayChannels.toCompletable(
rawFutureStub().acknowledgeAlarm(request), "acknowledge alarm", reply -> {
MxGatewayErrors.ensureProtocolSuccess("acknowledge alarm", reply.getProtocolStatus(), null);
return reply;
});
}
/**
@@ -351,43 +350,30 @@ public final class MxGatewayClient implements AutoCloseable {
* callers do not leave in-flight calls or Netty event-loop threads running
* after the block exits.
*
* <p>Waits up to the configured connect timeout for graceful termination
* and forcibly shuts the channel down on timeout. If the calling thread is
* interrupted while waiting, the channel is forcibly shut down and the
* thread's interrupt flag is restored. No-op for clients that do not own
* their channel. For an explicitly checked, blocking-aware shutdown call
* {@link #closeAndAwaitTermination()}.
* <p>Waits up to {@link MxGatewayClientOptions#shutdownTimeout()} for
* graceful termination and forcibly shuts the channel down on timeout. If
* the calling thread is interrupted while waiting, the channel is forcibly
* shut down and the thread's interrupt flag is restored. No-op for clients
* that do not own their channel. For an explicitly checked, blocking-aware
* shutdown call {@link #closeAndAwaitTermination()}. Delegates to the
* shared {@link MxGatewayChannels#shutdown} so behavior stays in lockstep
* with {@link GalaxyRepositoryClient}.
*/
@Override
public void close() {
if (ownedChannel == null) {
return;
}
ownedChannel.shutdown();
try {
if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
ownedChannel.shutdownNow();
}
} catch (InterruptedException error) {
ownedChannel.shutdownNow();
Thread.currentThread().interrupt();
}
MxGatewayChannels.shutdown(ownedChannel, options);
}
/**
* Shuts the owned channel down and waits up to the configured connect
* timeout for termination, forcibly shutting it down on timeout. No-op
* for clients that do not own their channel.
* Shuts the owned channel down and waits up to
* {@link MxGatewayClientOptions#shutdownTimeout()} for termination,
* forcibly shutting it down on timeout. No-op for clients that do not own
* their channel.
*
* @throws InterruptedException if the calling thread is interrupted while waiting
*/
public void closeAndAwaitTermination() throws InterruptedException {
if (ownedChannel != null) {
ownedChannel.shutdown();
if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
ownedChannel.shutdownNow();
}
}
MxGatewayChannels.shutdownAndAwaitTermination(ownedChannel, options);
}
static ProtocolStatusCode okStatusCode() {
@@ -14,6 +14,7 @@ import java.util.Objects;
public final class MxGatewayClientOptions {
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10);
private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofSeconds(30);
private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
private static final int DEFAULT_MAX_GRPC_MESSAGE_BYTES = 16 * 1024 * 1024;
private final String endpoint;
@@ -24,6 +25,7 @@ public final class MxGatewayClientOptions {
private final Duration connectTimeout;
private final Duration callTimeout;
private final Duration streamTimeout;
private final Duration shutdownTimeout;
private final int maxGrpcMessageBytes;
private MxGatewayClientOptions(Builder builder) {
@@ -35,6 +37,7 @@ public final class MxGatewayClientOptions {
connectTimeout = builder.connectTimeout == null ? DEFAULT_CONNECT_TIMEOUT : builder.connectTimeout;
callTimeout = builder.callTimeout == null ? DEFAULT_CALL_TIMEOUT : builder.callTimeout;
streamTimeout = builder.streamTimeout;
shutdownTimeout = builder.shutdownTimeout == null ? DEFAULT_SHUTDOWN_TIMEOUT : builder.shutdownTimeout;
maxGrpcMessageBytes = builder.maxGrpcMessageBytes <= 0
? DEFAULT_MAX_GRPC_MESSAGE_BYTES
: builder.maxGrpcMessageBytes;
@@ -131,6 +134,18 @@ public final class MxGatewayClientOptions {
return streamTimeout;
}
/**
* Returns the upper bound on graceful shutdown waiting, applied by
* {@code close()} and {@code closeAndAwaitTermination()}. Independent of
* {@link #connectTimeout()}; a small connect timeout no longer forces an
* aggressive {@code shutdownNow()} on in-flight calls.
*
* @return the shutdown timeout duration
*/
public Duration shutdownTimeout() {
return shutdownTimeout;
}
public int maxGrpcMessageBytes() {
return maxGrpcMessageBytes;
}
@@ -157,6 +172,8 @@ public final class MxGatewayClientOptions {
+ callTimeout
+ ", streamTimeout="
+ streamTimeout
+ ", shutdownTimeout="
+ shutdownTimeout
+ ", maxGrpcMessageBytes="
+ maxGrpcMessageBytes
+ '}';
@@ -181,6 +198,7 @@ public final class MxGatewayClientOptions {
private Duration connectTimeout;
private Duration callTimeout;
private Duration streamTimeout;
private Duration shutdownTimeout;
private int maxGrpcMessageBytes;
private Builder() {
@@ -277,6 +295,20 @@ public final class MxGatewayClientOptions {
return this;
}
/**
* Sets the upper bound on graceful shutdown waiting (applied by
* {@code close()} and {@code closeAndAwaitTermination()}). Defaults to
* 10 s and is independent of the connect timeout.
*
* @param value the shutdown timeout, must be non-{@code null}
* @return this builder
* @throws NullPointerException if {@code value} is {@code null}
*/
public Builder shutdownTimeout(Duration value) {
shutdownTimeout = Objects.requireNonNull(value, "shutdownTimeout");
return this;
}
public Builder maxGrpcMessageBytes(int value) {
maxGrpcMessageBytes = value;
return this;
@@ -1,5 +1,7 @@
package com.dohertylan.mxgateway.client;
import java.util.regex.Pattern;
/**
* Helpers for redacting secrets such as gateway API keys from log output.
*
@@ -7,6 +9,16 @@ package com.dohertylan.mxgateway.client;
* produce shortened, masked forms safe for diagnostic messages.
*/
public final class MxGatewaySecrets {
// Match any gateway-shaped credential anywhere in the string, regardless of
// surrounding punctuation: quoted, colon/comma-delimited, embedded in URLs
// or parens. The underscore-separated character class also covers a
// trailing hyphen in case a future key format introduces one.
private static final Pattern MXGW_TOKEN = Pattern.compile("mxgw_[A-Za-z0-9_-]+");
// Mask the token after a Bearer marker as a unit so callers cannot
// accidentally leak the secret when the surrounding text is a header-style
// string (e.g. "Bearer mxgw_id_secret").
private static final Pattern BEARER_TOKEN = Pattern.compile("(?i)bearer\\s+\\S+");
private MxGatewaySecrets() {
}
@@ -43,9 +55,15 @@ public final class MxGatewaySecrets {
}
/**
* Replaces gateway-style credential tokens (the {@code mxgw_} prefix and
* any {@code Bearer} marker) inside a free-form string with a redaction
* placeholder.
* Replaces gateway-style credential tokens inside a free-form string with a
* redaction placeholder.
*
* <p>Matches any {@code mxgw_<...>} token anywhere in the string,
* irrespective of surrounding punctuation (whitespace, colons, commas,
* single/double quotes, parentheses, embedded URL paths). Also masks the
* argument of an authorization-header style {@code Bearer <token>} marker
* as a unit so the token cannot leak through when the surrounding string
* is a raw header value.
*
* @param value the string to scrub, may be {@code null}
* @return an empty string for {@code null}, the original value when blank,
@@ -56,12 +74,8 @@ public final class MxGatewaySecrets {
return value == null ? "" : value;
}
String[] parts = value.split("\\s+");
for (int index = 0; index < parts.length; index++) {
if (parts[index].startsWith("mxgw_") || parts[index].equalsIgnoreCase("bearer")) {
parts[index] = "<redacted>";
}
}
return String.join(" ", parts);
String scrubbed = MXGW_TOKEN.matcher(value).replaceAll("<redacted>");
scrubbed = BEARER_TOKEN.matcher(scrubbed).replaceAll("Bearer <redacted>");
return scrubbed;
}
}
@@ -106,6 +106,37 @@ final class MxGatewayFixtureTests {
assertFalse(authError.getMessage().contains("visible_secret"));
}
@Test
void redactCredentialsHandlesNonWhitespaceDelimitedTokens() {
// Client.Java-018 regression: the previous whitespace-split scrub left
// mxgw_ credentials attached to quotes, commas, colons, parens, and
// URL paths intact. The strengthened pattern matches mxgw_<...>
// anywhere in the string regardless of surrounding punctuation.
String singleQuoted = MxGatewaySecrets.redactCredentials("authentication failed: 'mxgw_keyid_secret'");
String doubleQuoted = MxGatewaySecrets.redactCredentials("Bearer:\"mxgw_keyid_secret\"");
String commaDelimited = MxGatewaySecrets.redactCredentials("token=mxgw_keyid_secret,scope=admin");
String colonDelimited = MxGatewaySecrets.redactCredentials("Bearer:mxgw_keyid_secret");
String parenthesised = MxGatewaySecrets.redactCredentials("auth(mxgw_keyid_secret)");
String urlEmbedded = MxGatewaySecrets.redactCredentials("https://gw/api?key=mxgw_keyid_secret&x=1");
String bearerHeader = MxGatewaySecrets.redactCredentials("Bearer mxgw_keyid_secret");
for (String redacted : new String[] {
singleQuoted, doubleQuoted, commaDelimited, colonDelimited, parenthesised, urlEmbedded, bearerHeader
}) {
assertFalse(redacted.contains("mxgw_keyid_secret"), "expected redaction, got: " + redacted);
assertFalse(redacted.contains("keyid_secret"), "tail leaked: " + redacted);
assertTrue(redacted.contains("<redacted>"), "expected <redacted>, got: " + redacted);
}
}
@Test
void redactCredentialsLeavesBenignContentAlone() {
assertEquals(
"no credentials here",
MxGatewaySecrets.redactCredentials("no credentials here"));
assertEquals("", MxGatewaySecrets.redactCredentials(null));
}
private static JsonObject readFixture(String relativePath) throws Exception {
return JsonParser.parseString(Files.readString(fixtureRoot().resolve(relativePath))).getAsJsonObject();
}
@@ -0,0 +1,182 @@
package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
/**
* Regression tests for the second-pass Low-severity Client.Java findings
* Client.Java-016, Client.Java-019, and the shared shutdown helpers extracted
* to {@link MxGatewayChannels}.
*/
final class MxGatewayLowFindingsIITests {
// --- Client.Java-019: shutdown timeout is independent of connect timeout ---
@Test
void shutdownAndAwaitTerminationHonoursShutdownTimeoutNotConnectTimeout() throws Exception {
// The historical bug: close() used connectTimeout as the awaitTermination
// deadline, so a small connectTimeout forced a premature shutdownNow()
// on in-flight calls. The fix uses a dedicated shutdownTimeout. This
// test verifies the helper waits up to shutdownTimeout (1s) even when
// connectTimeout is set to a tiny value (50ms).
RecordingChannel channel = new RecordingChannel(/* terminatesAfterMillis = */ 200);
MxGatewayClientOptions options = MxGatewayClientOptions.builder()
.endpoint("in-process")
.plaintext(true)
.connectTimeout(Duration.ofMillis(50))
.shutdownTimeout(Duration.ofSeconds(1))
.build();
long start = System.nanoTime();
MxGatewayChannels.shutdownAndAwaitTermination(channel, options);
long elapsedMillis = (System.nanoTime() - start) / 1_000_000L;
// The channel finished orderly termination within the shutdown timeout
// window, so shutdownNow() must NOT have been called. With the old
// implementation a 50ms connect-timeout-as-shutdown-deadline would
// have escalated to shutdownNow() before the channel's 200ms graceful
// termination completed.
assertTrue(channel.shutdownCalled, "shutdown() must be called");
assertFalse(
channel.shutdownNowCalled,
"graceful termination finished within shutdownTimeout; shutdownNow() must not have been called");
// Allow ample slack for build-machine variance but assert we waited at
// least the channel's graceful-termination window.
assertTrue(elapsedMillis >= 150, "should have waited for graceful termination, elapsed=" + elapsedMillis);
}
@Test
void shutdownEscalatesToShutdownNowWhenTimeoutExceeded() {
// The other half of the contract: a channel that does not terminate
// within the shutdownTimeout window must be forcibly shut down.
RecordingChannel channel = new RecordingChannel(/* terminatesAfterMillis = */ 5_000);
MxGatewayClientOptions options = MxGatewayClientOptions.builder()
.endpoint("in-process")
.plaintext(true)
.shutdownTimeout(Duration.ofMillis(100))
.build();
MxGatewayChannels.shutdown(channel, options);
assertTrue(channel.shutdownCalled);
assertTrue(channel.shutdownNowCalled, "stuck channel must be forcibly shut down");
}
@Test
void shutdownTimeoutDefaultIsTenSecondsIndependentOfConnectTimeout() {
MxGatewayClientOptions defaults = MxGatewayClientOptions.builder()
.endpoint("in-process")
.build();
// Default is 10s; an unset connectTimeout-of-10s default coincides but
// the two are now independent options.
assertEquals(Duration.ofSeconds(10), defaults.shutdownTimeout());
MxGatewayClientOptions tinyConnect = MxGatewayClientOptions.builder()
.endpoint("in-process")
.connectTimeout(Duration.ofMillis(500))
.build();
assertEquals(Duration.ofSeconds(10), tinyConnect.shutdownTimeout(),
"shutdownTimeout default is independent of connectTimeout");
}
// --- Client.Java-016: shared shutdown helpers behave identically for both clients ---
@Test
void sharedShutdownHelperIsNoOpForNullChannel() throws Exception {
MxGatewayClientOptions options = MxGatewayClientOptions.builder()
.endpoint("in-process")
.plaintext(true)
.shutdownTimeout(Duration.ofMillis(50))
.build();
// Both helpers must tolerate a null owned-channel (caller-managed channel case).
MxGatewayChannels.shutdown(null, options);
MxGatewayChannels.shutdownAndAwaitTermination(null, options);
}
/**
* Test double for {@link ManagedChannel} that records {@code shutdown}/
* {@code shutdownNow} invocations and simulates an orderly termination
* after a configurable delay. Avoids the heavy in-process gRPC machinery
* the shutdown helpers only touch the three lifecycle methods.
*/
private static final class RecordingChannel extends ManagedChannel {
private final long terminatesAfterMillis;
private final long createdAtNanos;
private volatile boolean shutdownCalled;
private volatile boolean shutdownNowCalled;
RecordingChannel(long terminatesAfterMillis) {
this.terminatesAfterMillis = terminatesAfterMillis;
this.createdAtNanos = System.nanoTime();
}
@Override
public ManagedChannel shutdown() {
shutdownCalled = true;
return this;
}
@Override
public boolean isShutdown() {
return shutdownCalled || shutdownNowCalled;
}
@Override
public boolean isTerminated() {
if (shutdownNowCalled) {
return true;
}
if (!shutdownCalled) {
return false;
}
long elapsed = (System.nanoTime() - createdAtNanos) / 1_000_000L;
return elapsed >= terminatesAfterMillis;
}
@Override
public ManagedChannel shutdownNow() {
shutdownNowCalled = true;
return this;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
while (System.nanoTime() < deadlineNanos) {
if (isTerminated()) {
return true;
}
long remaining = Math.max(1, (deadlineNanos - System.nanoTime()) / 1_000_000L);
Thread.sleep(Math.min(remaining, 10));
}
return isTerminated();
}
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
throw new UnsupportedOperationException("no RPCs are issued in shutdown tests");
}
@Override
public String authority() {
return "in-process";
}
@Override
public ConnectivityState getState(boolean requestConnection) {
return ConnectivityState.IDLE;
}
}
}
@@ -13,6 +13,7 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
@@ -27,7 +28,7 @@ import org.junit.jupiter.api.Test;
/**
* Regression tests for the Medium-severity Client.Java code-review findings
* (Client.Java-001 through Client.Java-005).
* (Client.Java-001 through Client.Java-005, and Client.Java-014/015).
*/
final class MxGatewayMediumFindingsTests {
@@ -323,6 +324,138 @@ final class MxGatewayMediumFindingsTests {
}
}
// --- Client.Java-014: MxEventStream.close() before beforeStart must cancel the call ---
@Test
void mxEventStreamCloseBeforeBeforeStartCancelsStream() {
// Mirrors GalaxyRepositoryClientTests.deployEventStreamCloseBeforeBeforeStartCancelsStream:
// if close() runs before the gRPC call has attached its ClientCallStreamObserver,
// beforeStart() must observe the prior close and cancel the underlying call so the
// gRPC subscription does not leak open after the consumer has stopped iterating.
MxEventStream stream = new MxEventStream(4);
io.grpc.stub.ClientResponseObserver<
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent>
observer = stream.observer();
RecordingEventsRequestStream requestStream = new RecordingEventsRequestStream();
stream.close();
observer.beforeStart(requestStream);
assertTrue(requestStream.cancelled, "beforeStart must cancel the underlying call after a prior close()");
assertEquals("client cancelled event stream", requestStream.cancelMessage);
assertFalse(stream.hasNext());
}
// --- Client.Java-015: cancelling the user-visible *Async future cancels the gRPC call ---
@Test
void invokeAsyncCancellationCancelsUnderlyingGrpcCall() throws Exception {
// Set up a gateway service that never completes the invoke call so cancellation is
// the only way the call terminates. Hook ServerCallStreamObserver.setOnCancelHandler
// to latch when the server observes cancellation.
java.util.concurrent.CountDownLatch serverCancelled = new java.util.concurrent.CountDownLatch(1);
TestService service = new TestService() {
@Override
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
io.grpc.stub.ServerCallStreamObserver<MxCommandReply> serverObserver =
(io.grpc.stub.ServerCallStreamObserver<MxCommandReply>) responseObserver;
serverObserver.setOnCancelHandler(serverCancelled::countDown);
// Intentionally never complete the call must be terminated by the client
// cancelling its future, which must propagate to the gRPC cancellation.
}
};
try (Harness harness = Harness.start(service)) {
CompletableFuture<MxCommandReply> future = harness.client().invokeAsync(MxCommandRequest.newBuilder()
.setSessionId("s-cancel")
.setCommand(mxaccess_gateway.v1.MxaccessGateway.MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_REGISTER))
.build());
// Cancellation of the user-visible future must propagate to the gRPC call.
assertTrue(future.cancel(true), "cancel(true) should return true on a pending future");
assertTrue(
serverCancelled.await(5, java.util.concurrent.TimeUnit.SECONDS),
"server must observe RPC cancellation after future.cancel(true)");
}
}
@Test
void toCompletableValidatorOverloadForwardsCancellationToSource() {
// Unit-level proof: cancel() on the future returned by the validator-aware
// toCompletable overload must call cancel(true) on the source ListenableFuture.
// This is the core fix for Client.Java-015 the validator runs inside
// toCompletable instead of via .thenApply, so the user holds the future
// that is bound to the source.
com.google.common.util.concurrent.SettableFuture<String> source =
com.google.common.util.concurrent.SettableFuture.create();
java.util.concurrent.CompletableFuture<Integer> target =
MxGatewayChannels.toCompletable(source, "noop", String::length);
assertFalse(source.isCancelled());
assertTrue(target.cancel(true));
assertTrue(source.isCancelled(), "source ListenableFuture must be cancelled");
}
@Test
void toCompletableNoValidatorOverloadForwardsCancellationToSource() {
// Regression for the no-validator overload (the historic toCompletable shape).
com.google.common.util.concurrent.SettableFuture<String> source =
com.google.common.util.concurrent.SettableFuture.create();
java.util.concurrent.CompletableFuture<String> target = MxGatewayChannels.toCompletable(source, "noop");
assertFalse(source.isCancelled());
assertTrue(target.cancel(true));
assertTrue(source.isCancelled(), "source ListenableFuture must be cancelled");
}
private static final class RecordingEventsRequestStream
extends io.grpc.stub.ClientCallStreamObserver<
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest> {
private boolean cancelled;
private String cancelMessage;
@Override
public void cancel(String message, Throwable cause) {
cancelled = true;
cancelMessage = message;
}
@Override
public boolean isReady() {
return true;
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
}
@Override
public void request(int count) {
}
@Override
public void setMessageCompression(boolean enable) {
}
@Override
public void disableAutoInboundFlowControl() {
}
@Override
public void onNext(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
}
private static mxaccess_gateway.v1.MxaccessGateway.MxEvent testEvent(int sequence) {
return mxaccess_gateway.v1.MxaccessGateway.MxEvent.newBuilder()
.setWorkerSequence(sequence)
@@ -8976,17 +8976,36 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
getFullTagReferenceBytes();
/**
* <pre>
* Raw Galaxy SQL `dbo.data_type` identifier, passed through unchanged.
* This is NOT a member of `mxaccess_gateway.v1.MxDataType` Galaxy's
* type enumeration is distinct from MXAccess's wire data-type enum and
* the two must not be cast or compared. The GalaxyRepository service is
* metadata-only and deliberately does not share types with
* mxaccess_gateway.proto. See docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_data_type = 3;</code>
* @return The mxDataType.
*/
int getMxDataType();
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return The dataTypeName.
*/
java.lang.String getDataTypeName();
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return The bytes for dataTypeName.
*/
@@ -9012,12 +9031,24 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
boolean getArrayDimensionPresent();
/**
* <pre>
* Raw Galaxy SQL attribute-category identifier, passed through unchanged.
* Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_attribute_category = 8;</code>
* @return The mxAttributeCategory.
*/
int getMxAttributeCategory();
/**
* <pre>
* Raw Galaxy SQL security-classification identifier, passed through
* unchanged. Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 security_classification = 9;</code>
* @return The securityClassification.
*/
@@ -9156,6 +9187,15 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
public static final int MX_DATA_TYPE_FIELD_NUMBER = 3;
private int mxDataType_ = 0;
/**
* <pre>
* Raw Galaxy SQL `dbo.data_type` identifier, passed through unchanged.
* This is NOT a member of `mxaccess_gateway.v1.MxDataType` Galaxy's
* type enumeration is distinct from MXAccess's wire data-type enum and
* the two must not be cast or compared. The GalaxyRepository service is
* metadata-only and deliberately does not share types with
* mxaccess_gateway.proto. See docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_data_type = 3;</code>
* @return The mxDataType.
*/
@@ -9168,6 +9208,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
@SuppressWarnings("serial")
private volatile java.lang.Object dataTypeName_ = "";
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return The dataTypeName.
*/
@@ -9185,6 +9230,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
}
}
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return The bytes for dataTypeName.
*/
@@ -9239,6 +9289,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
public static final int MX_ATTRIBUTE_CATEGORY_FIELD_NUMBER = 8;
private int mxAttributeCategory_ = 0;
/**
* <pre>
* Raw Galaxy SQL attribute-category identifier, passed through unchanged.
* Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_attribute_category = 8;</code>
* @return The mxAttributeCategory.
*/
@@ -9250,6 +9306,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
public static final int SECURITY_CLASSIFICATION_FIELD_NUMBER = 9;
private int securityClassification_ = 0;
/**
* <pre>
* Raw Galaxy SQL security-classification identifier, passed through
* unchanged. Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 security_classification = 9;</code>
* @return The securityClassification.
*/
@@ -9956,6 +10018,15 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
private int mxDataType_ ;
/**
* <pre>
* Raw Galaxy SQL `dbo.data_type` identifier, passed through unchanged.
* This is NOT a member of `mxaccess_gateway.v1.MxDataType` Galaxy's
* type enumeration is distinct from MXAccess's wire data-type enum and
* the two must not be cast or compared. The GalaxyRepository service is
* metadata-only and deliberately does not share types with
* mxaccess_gateway.proto. See docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_data_type = 3;</code>
* @return The mxDataType.
*/
@@ -9964,6 +10035,15 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return mxDataType_;
}
/**
* <pre>
* Raw Galaxy SQL `dbo.data_type` identifier, passed through unchanged.
* This is NOT a member of `mxaccess_gateway.v1.MxDataType` Galaxy's
* type enumeration is distinct from MXAccess's wire data-type enum and
* the two must not be cast or compared. The GalaxyRepository service is
* metadata-only and deliberately does not share types with
* mxaccess_gateway.proto. See docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_data_type = 3;</code>
* @param value The mxDataType to set.
* @return This builder for chaining.
@@ -9976,6 +10056,15 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return this;
}
/**
* <pre>
* Raw Galaxy SQL `dbo.data_type` identifier, passed through unchanged.
* This is NOT a member of `mxaccess_gateway.v1.MxDataType` Galaxy's
* type enumeration is distinct from MXAccess's wire data-type enum and
* the two must not be cast or compared. The GalaxyRepository service is
* metadata-only and deliberately does not share types with
* mxaccess_gateway.proto. See docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_data_type = 3;</code>
* @return This builder for chaining.
*/
@@ -9988,6 +10077,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
private java.lang.Object dataTypeName_ = "";
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return The dataTypeName.
*/
@@ -10004,6 +10098,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
}
}
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return The bytes for dataTypeName.
*/
@@ -10021,6 +10120,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
}
}
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @param value The dataTypeName to set.
* @return This builder for chaining.
@@ -10034,6 +10138,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return this;
}
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @return This builder for chaining.
*/
@@ -10044,6 +10153,11 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return this;
}
/**
* <pre>
* Human-readable name from Galaxy's `dbo.data_type` table (e.g. "Float",
* "Integer", "Boolean"). Free-form Galaxy text; not a stable enum.
* </pre>
*
* <code>string data_type_name = 4;</code>
* @param value The bytes for dataTypeName to set.
* @return This builder for chaining.
@@ -10156,6 +10270,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
private int mxAttributeCategory_ ;
/**
* <pre>
* Raw Galaxy SQL attribute-category identifier, passed through unchanged.
* Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_attribute_category = 8;</code>
* @return The mxAttributeCategory.
*/
@@ -10164,6 +10284,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return mxAttributeCategory_;
}
/**
* <pre>
* Raw Galaxy SQL attribute-category identifier, passed through unchanged.
* Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_attribute_category = 8;</code>
* @param value The mxAttributeCategory to set.
* @return This builder for chaining.
@@ -10176,6 +10302,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return this;
}
/**
* <pre>
* Raw Galaxy SQL attribute-category identifier, passed through unchanged.
* Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 mx_attribute_category = 8;</code>
* @return This builder for chaining.
*/
@@ -10188,6 +10320,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
private int securityClassification_ ;
/**
* <pre>
* Raw Galaxy SQL security-classification identifier, passed through
* unchanged. Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 security_classification = 9;</code>
* @return The securityClassification.
*/
@@ -10196,6 +10334,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return securityClassification_;
}
/**
* <pre>
* Raw Galaxy SQL security-classification identifier, passed through
* unchanged. Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 security_classification = 9;</code>
* @param value The securityClassification to set.
* @return This builder for chaining.
@@ -10208,6 +10352,12 @@ public final class GalaxyRepositoryOuterClass extends com.google.protobuf.Genera
return this;
}
/**
* <pre>
* Raw Galaxy SQL security-classification identifier, passed through
* unchanged. Galaxy-specific; not mapped to any gateway enum. See
* docs/GalaxyRepository.md.
* </pre>
*
* <code>int32 security_classification = 9;</code>
* @return This builder for chaining.
*/
@@ -40706,16 +40706,31 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
int getVerifierUserId();
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return Whether the value field is set.
*/
boolean hasValue();
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return The value.
*/
mxaccess_gateway.v1.MxaccessGateway.MxValue getValue();
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
mxaccess_gateway.v1.MxaccessGateway.MxValueOrBuilder getValueOrBuilder();
@@ -40794,6 +40809,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
public static final int VALUE_FIELD_NUMBER = 4;
private mxaccess_gateway.v1.MxaccessGateway.MxValue value_;
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return Whether the value field is set.
*/
@@ -40802,6 +40822,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return The value.
*/
@@ -40810,6 +40835,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return value_ == null ? mxaccess_gateway.v1.MxaccessGateway.MxValue.getDefaultInstance() : value_;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
@java.lang.Override
@@ -41301,6 +41331,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
private com.google.protobuf.SingleFieldBuilder<
mxaccess_gateway.v1.MxaccessGateway.MxValue, mxaccess_gateway.v1.MxaccessGateway.MxValue.Builder, mxaccess_gateway.v1.MxaccessGateway.MxValueOrBuilder> valueBuilder_;
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return Whether the value field is set.
*/
@@ -41308,6 +41343,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return ((bitField0_ & 0x00000008) != 0);
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return The value.
*/
@@ -41319,6 +41359,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
}
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder setValue(mxaccess_gateway.v1.MxaccessGateway.MxValue value) {
@@ -41335,6 +41380,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder setValue(
@@ -41349,6 +41399,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder mergeValue(mxaccess_gateway.v1.MxaccessGateway.MxValue value) {
@@ -41370,6 +41425,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder clearValue() {
@@ -41383,6 +41443,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public mxaccess_gateway.v1.MxaccessGateway.MxValue.Builder getValueBuilder() {
@@ -41391,6 +41456,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return internalGetValueFieldBuilder().getBuilder();
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public mxaccess_gateway.v1.MxaccessGateway.MxValueOrBuilder getValueOrBuilder() {
@@ -41402,6 +41472,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
}
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
private com.google.protobuf.SingleFieldBuilder<
@@ -42314,16 +42389,31 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
int getVerifierUserId();
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return Whether the value field is set.
*/
boolean hasValue();
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return The value.
*/
mxaccess_gateway.v1.MxaccessGateway.MxValue getValue();
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
mxaccess_gateway.v1.MxaccessGateway.MxValueOrBuilder getValueOrBuilder();
@@ -42417,6 +42507,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
public static final int VALUE_FIELD_NUMBER = 4;
private mxaccess_gateway.v1.MxaccessGateway.MxValue value_;
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return Whether the value field is set.
*/
@@ -42425,6 +42520,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return The value.
*/
@@ -42433,6 +42533,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return value_ == null ? mxaccess_gateway.v1.MxaccessGateway.MxValue.getDefaultInstance() : value_;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
@java.lang.Override
@@ -42988,6 +43093,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
private com.google.protobuf.SingleFieldBuilder<
mxaccess_gateway.v1.MxaccessGateway.MxValue, mxaccess_gateway.v1.MxaccessGateway.MxValue.Builder, mxaccess_gateway.v1.MxaccessGateway.MxValueOrBuilder> valueBuilder_;
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return Whether the value field is set.
*/
@@ -42995,6 +43105,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return ((bitField0_ & 0x00000008) != 0);
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
* @return The value.
*/
@@ -43006,6 +43121,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
}
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder setValue(mxaccess_gateway.v1.MxaccessGateway.MxValue value) {
@@ -43022,6 +43142,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder setValue(
@@ -43036,6 +43161,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder mergeValue(mxaccess_gateway.v1.MxaccessGateway.MxValue value) {
@@ -43057,6 +43187,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public Builder clearValue() {
@@ -43070,6 +43205,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return this;
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public mxaccess_gateway.v1.MxaccessGateway.MxValue.Builder getValueBuilder() {
@@ -43078,6 +43218,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
return internalGetValueFieldBuilder().getBuilder();
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
public mxaccess_gateway.v1.MxaccessGateway.MxValueOrBuilder getValueOrBuilder() {
@@ -43089,6 +43234,11 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
}
}
/**
* <pre>
* Credential-sensitive write value. Implementations must not log this field
* unless an explicit redacted value-logging path is enabled.
* </pre>
*
* <code>.mxaccess_gateway.v1.MxValue value = 4;</code>
*/
private com.google.protobuf.SingleFieldBuilder<
@@ -43322,6 +43472,7 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
* <pre>
* Bulk Read snapshot the current value for each requested tag. MXAccess COM
* has no synchronous Read; the worker implements ReadBulk as:
*
* - If the tag is already in the session's item registry AND that item is
* currently advised AND the worker has a cached OnDataChange for it, the
* reply returns the cached value WITHOUT modifying the existing
@@ -43330,6 +43481,7 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
* Advise, wait up to `timeout_ms` for the first OnDataChange, then
* UnAdvise + RemoveItem before returning. The session is left exactly
* as it was before the call (was_cached = false).
*
* `timeout_ms == 0` uses the gateway-configured default (1000 ms).
* </pre>
*
@@ -43619,6 +43771,7 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
* <pre>
* Bulk Read snapshot the current value for each requested tag. MXAccess COM
* has no synchronous Read; the worker implements ReadBulk as:
*
* - If the tag is already in the session's item registry AND that item is
* currently advised AND the worker has a cached OnDataChange for it, the
* reply returns the cached value WITHOUT modifying the existing
@@ -43627,6 +43780,7 @@ public final class MxaccessGateway extends com.google.protobuf.GeneratedFile {
* Advise, wait up to `timeout_ms` for the first OnDataChange, then
* UnAdvise + RemoveItem before returning. The session is left exactly
* as it was before the call (was_cached = false).
*
* `timeout_ms == 0` uses the gateway-configured default (1000 ms).
* </pre>
*
+6
View File
@@ -226,6 +226,12 @@ The client supports plaintext channels for local development, TLS with system
roots, TLS with a custom `ca_file`, and an optional test server name override.
API keys are redacted from option repr output and CLI error output.
The CLI defaults to TLS. Pass `--plaintext` explicitly to open an unencrypted
channel — there is no implicit localhost downgrade. `--tls` is accepted but
redundant with the default, and cannot be combined with `--plaintext`. Scripts
that previously relied on a `localhost:` / `127.0.0.1:` endpoint silently
selecting plaintext must now pass `--plaintext` explicitly.
## CLI
The CLI emits deterministic JSON for automation:
+33
View File
@@ -8,6 +8,31 @@ version = "0.1.0"
description = "Async Python client for MXAccess Gateway."
readme = "README.md"
requires-python = ">=3.12"
license = "Proprietary"
authors = [
{ name = "MXAccess Gateway Authors" },
]
keywords = [
"mxaccess",
"archestra",
"gateway",
"grpc",
"industrial",
"scada",
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: System :: Distributed Computing",
"Typing :: Typed",
]
dependencies = [
"click>=8.3,<9",
"grpcio>=1.80,<2",
@@ -21,12 +46,20 @@ dev = [
"pytest-asyncio>=1.3,<2",
]
[project.urls]
Homepage = "https://gitea.dohertylan.com/dohertj2/mxaccessgw"
Source = "https://gitea.dohertylan.com/dohertj2/mxaccessgw"
Issues = "https://gitea.dohertylan.com/dohertj2/mxaccessgw/issues"
[project.scripts]
mxgw-py = "mxgateway_cli.commands:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
mxgateway = ["py.typed"]
[tool.pytest.ini_options]
addopts = "-ra"
pythonpath = ["src"]
+37 -9
View File
@@ -19,8 +19,7 @@ from mxgateway.errors import MxGatewayError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.options import ClientOptions
from mxgateway.session import Session
from mxgateway.values import to_mx_value
from mxgateway.values import MxValueInput
from mxgateway.values import MxValueInput, to_mx_value
MAX_AGGREGATE_EVENTS = 10_000
@@ -52,8 +51,25 @@ def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]:
default=None,
help="Environment variable containing the gateway API key.",
)(command)
command = click.option("--plaintext", is_flag=True, help="Use plaintext gRPC.")(command)
command = click.option("--tls", "use_tls", is_flag=True, help="Use TLS gRPC.")(command)
command = click.option(
"--plaintext",
is_flag=True,
help=(
"Use a plaintext gRPC channel. TLS is the default; pass --plaintext "
"explicitly to opt in to an unencrypted channel (no implicit "
"localhost downgrade)."
),
)(command)
command = click.option(
"--tls",
"use_tls",
is_flag=True,
help=(
"Use a TLS gRPC channel. Redundant with the default; retained for "
"symmetry with other client CLIs. Cannot be combined with "
"--plaintext."
),
)(command)
command = click.option("--ca-file", default=None, help="Custom root certificate file.")(command)
command = click.option(
"--server-name-override",
@@ -755,11 +771,23 @@ def _session(client: GatewayClient, session_id: str) -> Session:
def _use_plaintext(kwargs: dict[str, Any]) -> bool:
if kwargs.get("use_tls"):
return False
if kwargs.get("plaintext"):
return True
return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:")
"""Resolve whether to open a plaintext gRPC channel.
The contract matches the Go and Java CLIs (and is stricter than the
previous behaviour): TLS is the default, and the user must pass
``--plaintext`` to opt in to an unencrypted channel. There is no implicit
localhost downgrade -- silently transmitting a bearer token in cleartext
just because the endpoint starts with ``localhost:`` or ``127.0.0.1:`` was
the security regression Client.Python-013 closed. ``--tls`` is accepted as
a redundant, explicit affirmation of the default and must not be combined
with ``--plaintext``.
"""
plaintext = bool(kwargs.get("plaintext"))
use_tls = bool(kwargs.get("use_tls"))
if plaintext and use_tls:
raise click.UsageError("--plaintext and --tls are mutually exclusive.")
return plaintext
def _api_key_from_env(name: str | None) -> str | None:
+151 -1
View File
@@ -2,10 +2,12 @@
import json
import click
import pytest
from click.testing import CliRunner
from mxgateway import __version__
from mxgateway_cli.commands import main
from mxgateway_cli.commands import _use_plaintext, main
def test_version_json_is_deterministic() -> None:
@@ -66,3 +68,151 @@ def test_cli_error_output_redacts_api_key() -> None:
assert result.exit_code != 0
assert "mxgw_test_secret" not in result.output
# Regression tests for Client.Python-013: ``_use_plaintext`` must not silently
# downgrade ``localhost:`` / ``127.0.0.1:`` endpoints to plaintext. TLS is the
# default; users must pass ``--plaintext`` to opt in.
def test_use_plaintext_requires_explicit_flag_for_localhost_endpoint() -> None:
"""A ``localhost:`` endpoint with no flags must resolve to TLS."""
assert (
_use_plaintext(
{"endpoint": "localhost:5000", "plaintext": False, "use_tls": False}
)
is False
)
def test_use_plaintext_requires_explicit_flag_for_loopback_ip_endpoint() -> None:
"""A ``127.0.0.1:`` endpoint with no flags must resolve to TLS."""
assert (
_use_plaintext(
{"endpoint": "127.0.0.1:5000", "plaintext": False, "use_tls": False}
)
is False
)
def test_use_plaintext_explicit_plaintext_flag_opts_in() -> None:
"""``--plaintext`` must select plaintext regardless of endpoint host."""
assert (
_use_plaintext(
{"endpoint": "localhost:5000", "plaintext": True, "use_tls": False}
)
is True
)
assert (
_use_plaintext(
{
"endpoint": "mxgateway.example.local:5001",
"plaintext": True,
"use_tls": False,
}
)
is True
)
def test_use_plaintext_explicit_tls_flag_is_accepted_and_idempotent() -> None:
"""``--tls`` is accepted as a redundant affirmation of the default."""
assert (
_use_plaintext(
{
"endpoint": "mxgateway.example.local:5001",
"plaintext": False,
"use_tls": True,
}
)
is False
)
# Even for a localhost endpoint, ``--tls`` (the default) must yield TLS.
assert (
_use_plaintext(
{"endpoint": "localhost:5000", "plaintext": False, "use_tls": True}
)
is False
)
def test_use_plaintext_rejects_conflicting_flags() -> None:
"""``--plaintext`` combined with ``--tls`` is a usage error."""
with pytest.raises(click.UsageError):
_use_plaintext(
{"endpoint": "localhost:5000", "plaintext": True, "use_tls": True}
)
def test_cli_localhost_endpoint_defaults_to_tls_via_open_session(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""End-to-end: ``open-session`` against ``localhost:`` with no flags
must build a TLS ``ClientOptions`` (plaintext=False)."""
captured: dict[str, object] = {}
async def _fake_connect(options): # type: ignore[no-untyped-def]
captured["plaintext"] = options.plaintext
raise RuntimeError("stop-before-network")
monkeypatch.setattr(
"mxgateway_cli.commands.GatewayClient.connect", _fake_connect
)
runner = CliRunner()
result = runner.invoke(
main,
[
"open-session",
"--endpoint",
"localhost:5000",
"--api-key",
"mxgw_test_secret",
"--json",
],
)
assert result.exit_code != 0 # connect was stubbed to raise
assert captured.get("plaintext") is False, (
"localhost endpoint must default to TLS without an explicit --plaintext "
"flag (Client.Python-013 regression)."
)
def test_cli_localhost_endpoint_with_plaintext_flag_uses_plaintext(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""End-to-end: ``--plaintext`` opts in to plaintext as expected."""
captured: dict[str, object] = {}
async def _fake_connect(options): # type: ignore[no-untyped-def]
captured["plaintext"] = options.plaintext
raise RuntimeError("stop-before-network")
monkeypatch.setattr(
"mxgateway_cli.commands.GatewayClient.connect", _fake_connect
)
runner = CliRunner()
result = runner.invoke(
main,
[
"open-session",
"--endpoint",
"localhost:5000",
"--api-key",
"mxgw_test_secret",
"--plaintext",
"--json",
],
)
assert result.exit_code != 0
assert captured.get("plaintext") is True
@@ -0,0 +1,454 @@
"""Regression tests for Client.Python-015 and Client.Python-016.
Client.Python-015 coverage for the ``bench-read-bulk`` CLI body and the
``_percentile`` / ``_percentile_summary`` helpers. The percentile algorithm
must remain byte-for-byte equivalent across the five client languages
(.NET, Go, Rust, Java, Python) so cross-language stats are directly
comparable; the unit tests here lock that contract down with known inputs.
Client.Python-016 coverage for the two remaining untested CLI helpers
after Client.Python-013 removed the localhost auto-plaintext branch from
``_use_plaintext``: the ``MAX_AGGREGATE_EVENTS`` guard inside
``_collect_events`` and the ``_api_key_from_env`` env-var helper.
"""
from __future__ import annotations
import json
from typing import Any
import pytest
from click.testing import CliRunner
from mxgateway import ClientOptions, GatewayClient
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway_cli import commands
from mxgateway_cli.commands import (
MAX_AGGREGATE_EVENTS,
_api_key_from_env,
_percentile,
_percentile_summary,
)
# --- Client.Python-015: _percentile / _percentile_summary ------------------
#
# The algorithm is "linear interpolation between the two closest ranks", with
# the rank computed as ``q * (n - 1)``. This matches the .NET, Go, Rust and
# Java drivers; any divergence corrupts cross-language comparisons.
def test_percentile_empty_sample_returns_zero() -> None:
assert _percentile([], 0.50) == 0.0
assert _percentile([], 0.95) == 0.0
assert _percentile([], 0.99) == 0.0
def test_percentile_single_element_returns_that_element() -> None:
assert _percentile([42.0], 0.0) == 42.0
assert _percentile([42.0], 0.50) == 42.0
assert _percentile([42.0], 0.95) == 42.0
assert _percentile([42.0], 1.0) == 42.0
def test_percentile_exact_rank_returns_sample_value() -> None:
# n = 5 → rank for p50 = 0.5 * 4 = 2 → exact index 2 (value 30.0).
sample = [10.0, 20.0, 30.0, 40.0, 50.0]
assert _percentile(sample, 0.50) == 30.0
assert _percentile(sample, 0.0) == 10.0
assert _percentile(sample, 1.0) == 50.0
def test_percentile_interpolates_between_ranks() -> None:
# n = 5 → rank for p95 = 0.95 * 4 = 3.8 → between index 3 (40.0) and
# index 4 (50.0) with fraction 0.8 → 40.0 + (50.0 - 40.0) * 0.8 = 48.0.
sample = [10.0, 20.0, 30.0, 40.0, 50.0]
assert _percentile(sample, 0.95) == pytest.approx(48.0)
# p99 = 0.99 * 4 = 3.96 → 40.0 + 10.0 * 0.96 = 49.6.
assert _percentile(sample, 0.99) == pytest.approx(49.6)
def test_percentile_summary_empty_sample_zeros_all_fields() -> None:
assert _percentile_summary([]) == {
"p50": 0.0,
"p95": 0.0,
"p99": 0.0,
"max": 0.0,
"mean": 0.0,
}
def test_percentile_summary_known_sample_matches_cross_language_contract() -> None:
# The same five-element sample as the percentile interpolation test; the
# summary must be byte-for-byte the values the .NET / Go / Rust / Java
# drivers produce for the same input (linear interpolation, q * (n-1)).
sample = [10.0, 20.0, 30.0, 40.0, 50.0]
summary = _percentile_summary(sample)
assert summary == {
"p50": 30.0,
"p95": 48.0,
"p99": 49.6,
"max": 50.0,
"mean": 30.0,
}
def test_percentile_summary_rounds_to_three_decimal_places() -> None:
# 1, 2, 3 → p95 = 0.95 * 2 = 1.9 → 2 + (3 - 2) * 0.9 = 2.9; round(2.9, 3)
# is 2.9. Use a sample that exercises the round() call non-trivially.
sample = [1.0, 2.0, 3.0001, 4.0001]
summary = _percentile_summary(sample)
# mean = (1 + 2 + 3.0001 + 4.0001) / 4 = 2.50005 → rounded to 2.5
assert summary["mean"] == 2.5
# max round to 3dp = 4.0
assert summary["max"] == 4.0
# --- Client.Python-015: bench-read-bulk CLI smoke test ---------------------
class _BenchFakeUnary:
"""A fake unary RPC that pops a reply per call (cycling on exhaustion)."""
def __init__(self, replies_factory: Any) -> None:
self._factory = replies_factory
self.requests: list[Any] = []
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
self.requests.append(request)
return self._factory(request)
def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply:
return pb.MxCommandReply(
session_id="session-bench",
kind=kind,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
**fields,
)
class _BenchStub:
"""Fake gateway stub that handles OpenSession + Invoke for bench-read-bulk."""
def __init__(self, tags: list[str]) -> None:
self._tags = tags
async def open_session(
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
return pb.OpenSessionReply(
session_id="session-bench",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
)
async def close_session(
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
return pb.CloseSessionReply(
session_id=request.session_id,
final_state=pb.SESSION_STATE_CLOSED,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
)
def _reply_for(request: Any) -> Any:
kind = request.command.kind
if kind == pb.MX_COMMAND_KIND_REGISTER:
return _ok_reply(
kind,
register=pb.RegisterReply(server_handle=7),
)
if kind == pb.MX_COMMAND_KIND_SUBSCRIBE_BULK:
results = [
pb.SubscribeResult(
server_handle=7,
tag_address=tag,
item_handle=100 + i,
was_successful=True,
)
for i, tag in enumerate(self._tags)
]
return _ok_reply(
kind,
subscribe_bulk=pb.BulkSubscribeReply(results=results),
)
if kind == pb.MX_COMMAND_KIND_UNSUBSCRIBE_BULK:
results = [
pb.SubscribeResult(
server_handle=7,
item_handle=100 + i,
was_successful=True,
)
for i in range(len(self._tags))
]
return _ok_reply(
kind,
unsubscribe_bulk=pb.BulkSubscribeReply(results=results),
)
if kind == pb.MX_COMMAND_KIND_READ_BULK:
results = [
pb.BulkReadResult(
server_handle=7,
tag_address=tag,
item_handle=100 + i,
was_successful=True,
was_cached=True,
)
for i, tag in enumerate(self._tags)
]
return _ok_reply(
kind,
read_bulk=pb.BulkReadReply(results=results),
)
raise AssertionError(f"unexpected MxCommand kind in bench test: {kind}")
self.OpenSession = open_session
self.CloseSession = close_session
self.Invoke = _BenchFakeUnary(_reply_for)
def test_bench_read_bulk_emits_cross_language_schema(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Drive bench-read-bulk with duration=0 / warmup=0 and assert the schema.
A drift in any of these field names (callsPerSecond, cachedReadResults,
latencyMs.p50, ) would break the cross-language
scripts/bench-read-bulk.ps1 aggregation silently.
"""
bulk_size = 3
tags = [f"TestMachine_{i:03d}.TestChangingInt" for i in range(1, 1 + bulk_size)]
async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient:
return await GatewayClient.connect(
ClientOptions(endpoint=kwargs["endpoint"], plaintext=True),
stub=_BenchStub(tags),
)
monkeypatch.setattr(commands, "_connect", _fake_connect)
runner = CliRunner()
result = runner.invoke(
commands.main,
[
"bench-read-bulk",
"--endpoint",
"localhost:5000",
"--client-name",
"pytest-bench",
"--duration-seconds",
"0",
"--warmup-seconds",
"0",
"--bulk-size",
str(bulk_size),
"--tag-start",
"1",
"--json",
],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
# Locked cross-language schema (matches .NET / Go / Rust / Java drivers).
expected_top_level = {
"language",
"command",
"endpoint",
"clientName",
"bulkSize",
"durationSeconds",
"warmupSeconds",
"durationMs",
"tags",
"totalCalls",
"successfulCalls",
"failedCalls",
"totalReadResults",
"cachedReadResults",
"callsPerSecond",
"latencyMs",
}
assert set(payload.keys()) == expected_top_level
assert payload["language"] == "python"
assert payload["command"] == "bench-read-bulk"
assert payload["endpoint"] == "localhost:5000"
assert payload["clientName"] == "pytest-bench"
assert payload["bulkSize"] == bulk_size
assert payload["durationSeconds"] == 0
assert payload["warmupSeconds"] == 0
assert payload["tags"] == tags
# latencyMs sub-shape is the percentile-summary contract.
assert set(payload["latencyMs"].keys()) == {"p50", "p95", "p99", "max", "mean"}
for key in ("p50", "p95", "p99", "max", "mean"):
assert isinstance(payload["latencyMs"][key], (int, float))
# --- Client.Python-016: MAX_AGGREGATE_EVENTS guard -------------------------
def test_collect_events_rejects_max_events_above_aggregate_cap(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""``--max-events`` greater than ``MAX_AGGREGATE_EVENTS`` exits non-zero
with the documented error message.
The guard lives inside ``_collect_events`` (after a session is opened),
so the test routes the CLI through stubbed ``_connect`` / ``_session``
fakes and asserts the guard fires before any event is pulled.
"""
class _EventStreamShouldNotBeUsed:
def __aiter__(self) -> "_EventStreamShouldNotBeUsed":
return self
async def __anext__(self) -> pb.MxEvent:
raise AssertionError(
"MAX_AGGREGATE_EVENTS guard must trip before any event is pulled",
)
class _FakeSession:
def __init__(self) -> None:
self.session_id = "session-1"
def stream_events(
self, *, after_worker_sequence: int = 0
) -> _EventStreamShouldNotBeUsed:
return _EventStreamShouldNotBeUsed()
class _FakeClient:
async def __aenter__(self) -> "_FakeClient":
return self
async def __aexit__(self, *exc_info: object) -> None:
return None
async def _fake_connect(kwargs: dict[str, Any]) -> _FakeClient:
return _FakeClient()
def _fake_session(client: Any, session_id: str) -> _FakeSession:
return _FakeSession()
monkeypatch.setattr(commands, "_connect", _fake_connect)
monkeypatch.setattr(commands, "_session", _fake_session)
runner = CliRunner()
result = runner.invoke(
commands.main,
[
"stream-events",
"--endpoint",
"localhost:5000",
"--session-id",
"session-1",
"--max-events",
str(MAX_AGGREGATE_EVENTS + 1),
"--plaintext",
"--json",
],
)
assert result.exit_code != 0
assert f"less than or equal to {MAX_AGGREGATE_EVENTS}" in result.output
assert "--max-events" in result.output
def test_collect_events_accepts_max_events_at_aggregate_cap_boundary(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""``--max-events`` equal to ``MAX_AGGREGATE_EVENTS`` must not trip the guard."""
class _EmptyEventStream:
def __aiter__(self) -> "_EmptyEventStream":
return self
async def __anext__(self) -> pb.MxEvent:
raise StopAsyncIteration
class _FakeSession:
def __init__(self) -> None:
self.client = None # type: ignore[assignment]
self.session_id = "session-1"
def stream_events(self, *, after_worker_sequence: int = 0) -> _EmptyEventStream:
return _EmptyEventStream()
class _FakeClient:
async def __aenter__(self) -> "_FakeClient":
return self
async def __aexit__(self, *exc_info: object) -> None:
return None
async def _fake_connect(kwargs: dict[str, Any]) -> _FakeClient:
return _FakeClient()
def _fake_session(client: Any, session_id: str) -> _FakeSession:
return _FakeSession()
monkeypatch.setattr(commands, "_connect", _fake_connect)
monkeypatch.setattr(commands, "_session", _fake_session)
runner = CliRunner()
result = runner.invoke(
commands.main,
[
"stream-events",
"--endpoint",
"localhost:5000",
"--session-id",
"session-1",
"--max-events",
str(MAX_AGGREGATE_EVENTS),
"--timeout",
"0.01",
"--plaintext",
"--json",
],
)
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload == {"events": []}
# --- Client.Python-016: _api_key_from_env ----------------------------------
def test_api_key_from_env_resolves_value_when_variable_is_set(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("MXGATEWAY_TEST_API_KEY", "mxgw_envtest_secret")
assert _api_key_from_env("MXGATEWAY_TEST_API_KEY") == "mxgw_envtest_secret"
def test_api_key_from_env_returns_none_when_variable_is_unset(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("MXGATEWAY_TEST_API_KEY_NOT_SET", raising=False)
assert _api_key_from_env("MXGATEWAY_TEST_API_KEY_NOT_SET") is None
def test_api_key_from_env_returns_none_when_name_is_none() -> None:
assert _api_key_from_env(None) is None
def test_api_key_from_env_returns_none_when_name_is_empty_string() -> None:
# The implementation guards on ``if not name`` so empty string is treated
# the same as ``None`` — no env lookup is attempted.
assert _api_key_from_env("") is None
+54 -12
View File
@@ -93,11 +93,24 @@ impl Session {
pub async fn subscribe_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn unsubscribe_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn write(&self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32) -> Result<(), Error>;
pub async fn write_bulk(&self, server_handle: i32, entries: Vec<WriteBulkEntry>, user_id: i32) -> Result<Vec<BulkWriteResult>, Error>;
pub async fn write2_bulk(&self, server_handle: i32, entries: Vec<Write2BulkEntry>, timestamp: prost_types::Timestamp, user_id: i32) -> Result<Vec<BulkWriteResult>, Error>;
pub async fn write_secured_bulk(&self, server_handle: i32, entries: Vec<WriteSecuredBulkEntry>, current_user_id: i32, verifier_user_id: i32) -> Result<Vec<BulkWriteResult>, Error>;
pub async fn write_secured2_bulk(&self, server_handle: i32, entries: Vec<WriteSecured2BulkEntry>, timestamp: prost_types::Timestamp, current_user_id: i32, verifier_user_id: i32) -> Result<Vec<BulkWriteResult>, Error>;
pub async fn read_bulk(&self, server_handle: i32, tags: &[String], timeout_ms: u32) -> Result<Vec<ReadBulkResult>, Error>;
pub async fn events(&self) -> Result<impl Stream<Item = Result<MxEvent, Error>>, Error>;
pub async fn close(&self) -> Result<(), Error>;
}
```
The five bulk-write helpers (`write_bulk`, `write2_bulk`, `write_secured_bulk`,
`write_secured2_bulk`) and `read_bulk` mirror the worker's bulk command shapes
in `mxaccess_gateway.proto` and use the same correlation-id discipline as the
unary helpers — `session::next_correlation_id` is `pub` so that consumers
constructing raw `MxCommandRequest`/`CloseSessionRequest` payloads outside
the `Session` helpers (notably the `mxgw` test CLI's `ping` and
`close-session` subcommands) share the same id generation.
## Authentication
Use a `tonic` interceptor or request extension layer to add:
@@ -132,19 +145,29 @@ Use `thiserror`:
```rust
pub enum Error {
InvalidEndpoint { endpoint: String, detail: String },
InvalidArgument { name: String, detail: String },
Transport(tonic::transport::Error),
Status(tonic::Status),
Authentication(String),
Authorization(String),
Session(SessionError),
Worker(WorkerError),
Command(CommandError),
MxAccess(MxAccessError),
Timeout,
Cancelled,
Authentication { message: String, status: Box<tonic::Status> },
Authorization { message: String, status: Box<tonic::Status> },
Timeout { message: String, status: Box<tonic::Status> },
Cancelled { message: String, status: Box<tonic::Status> },
Unavailable { message: String, status: Box<tonic::Status> },
Status(Box<tonic::Status>),
Command(Box<CommandError>),
ProtocolStatus { operation: &'static str, code: ProtocolStatusCode, message: String },
MalformedReply { detail: String },
}
```
`Unavailable` classifies the transient `Code::Unavailable` /
`Code::ResourceExhausted` statuses so callers can decide whether to retry
without unwrapping the raw status. `MalformedReply` surfaces OK replies
whose payload does not carry the data the command contract requires (for
example, an `AddItem` reply missing the item handle, or a `WriteBulk` reply
carrying the wrong payload arm). `InvalidEndpoint` is returned when the
endpoint URL fails to parse or its TLS material cannot be loaded.
Preserve raw command replies in `CommandError` where applicable.
## Test CLI
@@ -153,13 +176,32 @@ Binary: `mxgw`.
Use `clap` derive.
Commands:
Commands (see `clients/rust/README.md` for full argument lists):
```text
mxgw version
mxgw smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item TestChildObject.TestInt
mxgw ping
mxgw open-session
mxgw close-session --session-id <id>
mxgw register --session-id <id> --client-name <name>
mxgw add-item --session-id <id> --server-handle <h> --item <tag>
mxgw advise --session-id <id> --server-handle <h> --item-handle <h>
mxgw subscribe-bulk --session-id <id> --server-handle <h> --items <a,b,c>
mxgw unsubscribe-bulk --session-id <id> --server-handle <h> --item-handles <1,2,3>
mxgw read-bulk --session-id <id> --server-handle <h> --items <a,b,c> --timeout-ms 1500
mxgw write --session-id <id> --server-handle 1 --item-handle 1 --value-type int32 --value 123
mxgw write2 --session-id <id> --server-handle 1 --item-handle 1 --value-type int32 --value 123 --timestamp <rfc3339>
mxgw write-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2>
mxgw write2-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2> --timestamp <rfc3339>
mxgw write-secured-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2>
mxgw write-secured2-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2> --timestamp <rfc3339>
mxgw stream-events --session-id <id> --json
mxgw write --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123
mxgw bench-read-bulk --duration-seconds 30 --bulk-size 6 --json
mxgw smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item TestChildObject.TestInt
mxgw galaxy test-connection
mxgw galaxy last-deploy-time
mxgw galaxy discover-hierarchy
mxgw galaxy watch [--last-seen-deploy-time <rfc3339>] [--max-events N]
```
JSON output should use `serde_json`.
+199 -51
View File
@@ -447,7 +447,9 @@ async fn run(cli: Cli) -> Result<(), Error> {
let client = connect(connection).await?;
let reply = client
.invoke(MxCommandRequest {
client_correlation_id: "rust-cli-ping".to_owned(),
client_correlation_id: mxgateway_client::session::next_correlation_id(
"cli-ping",
),
command: Some(MxCommand {
kind: MxCommandKind::Ping as i32,
payload: Some(mxgateway_client::generated::mxaccess_gateway::v1::mx_command::Payload::Ping(
@@ -494,7 +496,9 @@ async fn run(cli: Cli) -> Result<(), Error> {
let reply = client
.close_session_raw(CloseSessionRequest {
session_id,
client_correlation_id: "rust-cli-close-session".to_owned(),
client_correlation_id: mxgateway_client::session::next_correlation_id(
"cli-close-session",
),
})
.await?;
if json {
@@ -1034,19 +1038,13 @@ async fn run_bench_read_bulk(
.map(|r| r.item_handle)
.collect();
let warmup_deadline = std::time::Instant::now()
+ std::time::Duration::from_secs(warmup_seconds);
let warmup_deadline =
std::time::Instant::now() + std::time::Duration::from_secs(warmup_seconds);
while std::time::Instant::now() < warmup_deadline {
let _ = session
.read_bulk(server_handle, &tags, timeout_ms)
.await;
let _ = session.read_bulk(server_handle, &tags, timeout_ms).await;
}
let mut latencies_ms: Vec<f64> = Vec::with_capacity(65_536);
let mut total_read_results: u64 = 0;
let mut cached_read_results: u64 = 0;
let mut successful_calls: u64 = 0;
let mut failed_calls: u64 = 0;
let mut stats = BenchReadBulkStats::default();
let steady_start = std::time::Instant::now();
let steady_deadline = steady_start + std::time::Duration::from_secs(duration_seconds);
@@ -1054,18 +1052,9 @@ async fn run_bench_read_bulk(
let call_start = std::time::Instant::now();
let outcome = session.read_bulk(server_handle, &tags, timeout_ms).await;
let elapsed_ms = call_start.elapsed().as_secs_f64() * 1000.0;
latencies_ms.push(elapsed_ms);
match outcome {
Ok(results) => {
successful_calls += 1;
for r in &results {
total_read_results += 1;
if r.was_cached {
cached_read_results += 1;
}
}
}
Err(_) => failed_calls += 1,
Ok(results) => stats.record_success(elapsed_ms, &results),
Err(error) => stats.record_failure(elapsed_ms, &error),
}
}
let steady_elapsed = steady_start.elapsed();
@@ -1074,36 +1063,20 @@ async fn run_bench_read_bulk(
let _ = session.unsubscribe_bulk(server_handle, item_handles).await;
}
let total_calls = successful_calls + failed_calls;
let calls_per_second = if steady_elapsed.as_secs_f64() > 0.0 {
total_calls as f64 / steady_elapsed.as_secs_f64()
} else {
0.0
let context = BenchReadBulkContext {
endpoint: &endpoint,
client_name: &client_name,
bulk_size,
duration_seconds,
warmup_seconds,
steady_elapsed,
tags: &tags,
};
let summary = percentile_summary(&latencies_ms);
let stats = serde_json::json!({
"language": "rust",
"command": "bench-read-bulk",
"endpoint": endpoint,
"clientName": client_name,
"bulkSize": bulk_size,
"durationSeconds": duration_seconds,
"warmupSeconds": warmup_seconds,
"durationMs": steady_elapsed.as_millis() as u64,
"tags": tags,
"totalCalls": total_calls,
"successfulCalls": successful_calls,
"failedCalls": failed_calls,
"totalReadResults": total_read_results,
"cachedReadResults": cached_read_results,
"callsPerSecond": round_to(calls_per_second, 2),
"latencyMs": summary,
});
let json_stats = stats.to_json(&context);
if use_json {
println!("{}", stats);
println!("{}", json_stats);
} else {
println!("{calls_per_second}");
println!("{}", stats.calls_per_second(steady_elapsed));
}
Ok::<(), Error>(())
}
@@ -1113,6 +1086,102 @@ async fn run_bench_read_bulk(
bench_outcome
}
/// Per-iteration accounting for `bench-read-bulk`.
///
/// Only successful `read_bulk` calls contribute to the success-latency
/// histogram (`success_latencies_ms`). Failures are tracked separately in
/// `failure_latencies_ms` and the first failure's redacted error string is
/// stashed in `first_failure` so a partial-failure run is visible in the
/// emitted JSON. This keeps the cross-language `latencyMs.p99`/`max`
/// contract honest: it reports successful-call latency only and never
/// folds in a per-call timeout from a failed RPC.
#[derive(Default)]
struct BenchReadBulkStats {
success_latencies_ms: Vec<f64>,
failure_latencies_ms: Vec<f64>,
total_read_results: u64,
cached_read_results: u64,
successful_calls: u64,
failed_calls: u64,
first_failure: Option<String>,
}
impl BenchReadBulkStats {
fn record_success(
&mut self,
elapsed_ms: f64,
results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult],
) {
self.success_latencies_ms.push(elapsed_ms);
self.successful_calls += 1;
for result in results {
self.total_read_results += 1;
if result.was_cached {
self.cached_read_results += 1;
}
}
}
fn record_failure(&mut self, elapsed_ms: f64, error: &Error) {
self.failure_latencies_ms.push(elapsed_ms);
self.failed_calls += 1;
if self.first_failure.is_none() {
self.first_failure = Some(error.to_string());
}
}
fn total_calls(&self) -> u64 {
self.successful_calls + self.failed_calls
}
fn calls_per_second(&self, elapsed: std::time::Duration) -> f64 {
let seconds = elapsed.as_secs_f64();
if seconds > 0.0 {
self.total_calls() as f64 / seconds
} else {
0.0
}
}
fn to_json(&self, context: &BenchReadBulkContext<'_>) -> serde_json::Value {
let calls_per_second = self.calls_per_second(context.steady_elapsed);
let success_summary = percentile_summary(&self.success_latencies_ms);
let failure_summary = percentile_summary(&self.failure_latencies_ms);
serde_json::json!({
"language": "rust",
"command": "bench-read-bulk",
"endpoint": context.endpoint,
"clientName": context.client_name,
"bulkSize": context.bulk_size,
"durationSeconds": context.duration_seconds,
"warmupSeconds": context.warmup_seconds,
"durationMs": context.steady_elapsed.as_millis() as u64,
"tags": context.tags,
"totalCalls": self.total_calls(),
"successfulCalls": self.successful_calls,
"failedCalls": self.failed_calls,
"totalReadResults": self.total_read_results,
"cachedReadResults": self.cached_read_results,
"callsPerSecond": round_to(calls_per_second, 2),
"latencyMs": success_summary,
"failureLatencyMs": failure_summary,
"firstFailure": self.first_failure,
})
}
}
/// Static configuration for one `bench-read-bulk` run, packaged so the
/// JSON serialiser can quote it back without taking eight positional args.
struct BenchReadBulkContext<'a> {
endpoint: &'a str,
client_name: &'a str,
bulk_size: usize,
duration_seconds: u64,
warmup_seconds: u64,
steady_elapsed: std::time::Duration,
tags: &'a [String],
}
fn percentile_summary(sample: &[f64]) -> serde_json::Value {
if sample.is_empty() {
return serde_json::json!({ "p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0 });
@@ -1294,7 +1363,13 @@ fn build_write_bulk_entries(
item_handles: &[i32],
value_type: CliValueType,
values: &[String],
) -> Result<Vec<(i32, mxgateway_client::generated::mxaccess_gateway::v1::MxValue)>, Error> {
) -> Result<
Vec<(
i32,
mxgateway_client::generated::mxaccess_gateway::v1::MxValue,
)>,
Error,
> {
if item_handles.len() != values.len() {
return Err(Error::InvalidArgument {
name: "values".to_owned(),
@@ -1660,4 +1735,77 @@ mod tests {
assert_eq!(frac.seconds, utc.seconds);
assert_eq!(frac.nanos, 250_000_000);
}
#[test]
fn bench_read_bulk_stats_keeps_failures_out_of_success_latency_histogram() {
use mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult;
use mxgateway_client::Error;
let mut stats = super::BenchReadBulkStats::default();
let cached = BulkReadResult {
was_cached: true,
was_successful: true,
..BulkReadResult::default()
};
let uncached = BulkReadResult {
was_cached: false,
was_successful: true,
..BulkReadResult::default()
};
// Two fast successes and one slow failure: the slow failure must
// not pollute the success p99/max histogram.
stats.record_success(1.5, std::slice::from_ref(&cached));
stats.record_success(2.0, std::slice::from_ref(&uncached));
let failure = Error::MalformedReply {
detail: "synthetic failure for the bench test".to_owned(),
};
stats.record_failure(1_500.0, &failure);
assert_eq!(stats.success_latencies_ms, vec![1.5, 2.0]);
assert_eq!(stats.failure_latencies_ms, vec![1_500.0]);
assert_eq!(stats.successful_calls, 2);
assert_eq!(stats.failed_calls, 1);
assert_eq!(stats.total_calls(), 3);
assert_eq!(stats.total_read_results, 2);
assert_eq!(stats.cached_read_results, 1);
assert!(stats
.first_failure
.as_deref()
.unwrap()
.contains("synthetic failure"));
let elapsed = std::time::Duration::from_secs(1);
let context = super::BenchReadBulkContext {
endpoint: "http://fake",
client_name: "client",
bulk_size: 2,
duration_seconds: 1,
warmup_seconds: 0,
steady_elapsed: elapsed,
tags: &[],
};
let payload = stats.to_json(&context);
// The success-latency histogram must never see the 1_500 ms failure.
assert_eq!(payload["latencyMs"]["max"].as_f64().unwrap(), 2.0);
assert!(payload["latencyMs"]["p99"].as_f64().unwrap() <= 2.0);
// The failure-latency histogram must own it instead.
assert_eq!(
payload["failureLatencyMs"]["max"].as_f64().unwrap(),
1_500.0
);
assert_eq!(payload["failedCalls"].as_u64().unwrap(), 1);
assert_eq!(payload["successfulCalls"].as_u64().unwrap(), 2);
assert!(payload["firstFailure"]
.as_str()
.unwrap()
.contains("synthetic failure"));
}
#[test]
fn bench_read_bulk_stats_calls_per_second_handles_zero_duration() {
let stats = super::BenchReadBulkStats::default();
assert_eq!(stats.calls_per_second(std::time::Duration::ZERO), 0.0);
}
}
+3
View File
@@ -14,6 +14,7 @@ pub mod mxaccess_gateway {
/// gateway to language clients.
pub mod v1 {
#![allow(clippy::large_enum_variant)]
#![allow(clippy::doc_lazy_continuation)]
tonic::include_proto!("mxaccess_gateway.v1");
}
@@ -25,6 +26,7 @@ pub mod mxaccess_worker {
/// the named-pipe transport between gateway and worker.
pub mod v1 {
#![allow(clippy::large_enum_variant)]
#![allow(clippy::doc_lazy_continuation)]
tonic::include_proto!("mxaccess_worker.v1");
}
@@ -36,6 +38,7 @@ pub mod galaxy_repository {
/// discovery and deploy-event watch RPCs.
pub mod v1 {
#![allow(clippy::large_enum_variant)]
#![allow(clippy::doc_lazy_continuation)]
tonic::include_proto!("galaxy_repository.v1");
}
+9 -3
View File
@@ -33,7 +33,14 @@ static CORRELATION_SEQUENCE: AtomicU64 = AtomicU64::new(0);
/// Build a unique `client_correlation_id` for a request so concurrent or
/// repeated calls of the same command kind can be told apart in gateway logs.
fn next_correlation_id(label: &str) -> String {
///
/// Exposed so consumers that construct raw [`MxCommandRequest`] /
/// [`CloseSessionRequest`] payloads outside the `Session` helpers — notably
/// the `mxgw` test CLI — share the same correlation-id discipline as the
/// library. The returned id is `rust-client-{label}-{N}` where `N` comes
/// from a process-wide atomic sequence.
#[must_use]
pub fn next_correlation_id(label: &str) -> String {
let sequence = CORRELATION_SEQUENCE.fetch_add(1, Ordering::Relaxed);
format!("rust-client-{label}-{sequence}")
}
@@ -761,8 +768,7 @@ fn bulk_write_results(
BulkWriteReplyKind::WriteSecured2,
) => Ok(reply.results),
_ => Err(Error::MalformedReply {
detail: "bulk write reply did not carry the expected BulkWriteReply payload"
.to_owned(),
detail: "bulk write reply did not carry the expected BulkWriteReply payload".to_owned(),
}),
}
}
+394 -30
View File
@@ -20,7 +20,8 @@ use mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent,
MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, OpenSessionReply,
OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, QueryActiveAlarmsRequest, SessionState,
StreamEventsRequest, SubscribeResult, WriteBulkEntry,
StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry,
WriteSecuredBulkEntry,
};
use mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
@@ -160,7 +161,10 @@ async fn read_bulk_forwards_timeout_and_unpacks_cached_flag() {
let entry = &results[0];
assert!(entry.was_cached);
assert_eq!(entry.value.as_ref().and_then(|v| v.kind.as_ref()), Some(&Kind::Int32Value(99)));
assert_eq!(
entry.value.as_ref().and_then(|v| v.kind.as_ref()),
Some(&Kind::Int32Value(99))
);
assert_eq!(*state.last_read_bulk_timeout_ms.lock().await, Some(750));
}
@@ -393,6 +397,238 @@ async fn connect_with_unreadable_ca_file_reports_invalid_endpoint() {
);
}
#[tokio::test]
async fn register_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.register("client-name").await.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("Register")),
"expected MalformedReply for register, got {error:?}"
);
}
#[tokio::test]
async fn add_item_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("AddItem")),
"expected MalformedReply for add_item, got {error:?}"
);
}
#[tokio::test]
async fn add_item2_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.add_item2(12, "Plant.Area.Tag", "ctx")
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("AddItem2")),
"expected MalformedReply for add_item2, got {error:?}"
);
}
#[tokio::test]
async fn subscribe_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForBulk);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.subscribe_bulk(12, vec!["Tank01.Level".to_owned()])
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("bulk")),
"expected MalformedReply for subscribe_bulk, got {error:?}"
);
}
#[tokio::test]
async fn write_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForBulkWrite);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.write_bulk(
12,
vec![WriteBulkEntry {
item_handle: 901,
value: Some(int_value(11)),
user_id: 5,
}],
)
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("bulk write")),
"expected MalformedReply for write_bulk, got {error:?}"
);
}
#[tokio::test]
async fn read_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForReadBulk);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.read_bulk(12, &["Tank01.Level"], 500)
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("ReadBulk")),
"expected MalformedReply for read_bulk, got {error:?}"
);
}
#[tokio::test]
async fn unary_invoke_maps_status_unavailable_to_error_unavailable() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await =
Some(InvokeOverride::Unavailable("gateway restarting".to_owned()));
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
assert!(
matches!(&error, Error::Unavailable { .. }),
"expected Error::Unavailable for unary unavailable, got {error:?}"
);
}
#[tokio::test]
async fn write2_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write2_bulk(
12,
vec![Write2BulkEntry {
item_handle: 901,
value: Some(int_value(11)),
timestamp_value: Some(int_value(0)),
user_id: 5,
}],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
assert!(!results[1].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::Write2Bulk as i32));
}
#[tokio::test]
async fn write_secured_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_secured_bulk(
12,
vec![WriteSecuredBulkEntry {
item_handle: 901,
current_user_id: 7,
verifier_user_id: 9,
value: Some(int_value(11)),
}],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteSecuredBulk as i32));
}
#[tokio::test]
async fn write_secured2_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_secured2_bulk(
12,
vec![WriteSecured2BulkEntry {
item_handle: 901,
current_user_id: 7,
verifier_user_id: 9,
value: Some(int_value(11)),
timestamp_value: Some(int_value(0)),
}],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteSecured2Bulk as i32));
}
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
@@ -400,6 +636,39 @@ struct FakeState {
last_read_bulk_timeout_ms: Mutex<Option<u32>>,
stream_dropped: Arc<AtomicBool>,
emit_stream_fault: AtomicBool,
/// Test-injected override for the next (and all subsequent) `Invoke`
/// calls. When `Some`, the fake gateway returns the override's response
/// instead of its default per-kind reply. Used by the malformed-reply
/// and unary-Unavailable tests; default `None` preserves existing
/// happy-path test behaviour.
invoke_override: Mutex<Option<InvokeOverride>>,
}
/// Test-injected override for the fake gateway's `Invoke` handler.
///
/// Each variant short-circuits the per-kind dispatch in `FakeGateway::invoke`
/// and reproduces one of the wire shapes the Rust client's error paths must
/// handle. The bool tags the OK reply variants as "OK envelope, payload
/// missing/wrong" — the exact condition the new `Error::MalformedReply`
/// paths in `session.rs` are designed to catch.
#[derive(Clone)]
enum InvokeOverride {
/// Return `Status::unavailable(message)` from the unary Invoke RPC, so
/// the client maps it to `Error::Unavailable`.
Unavailable(String),
/// Return an OK `MxCommandReply` whose `payload` field is `None`. Used
/// to exercise `register_server_handle` / `add_item_handle` /
/// `add_item2_handle` falling through to the `MalformedReply` arm.
OkReplyNoPayload,
/// Return an OK reply whose payload arm does not match the bulk-read
/// command, so `read_bulk` falls through to its `MalformedReply` arm.
OkReplyWrongPayloadForReadBulk,
/// Return an OK reply whose payload arm does not match the requested
/// bulk command, so `bulk_results` falls through to `MalformedReply`.
OkReplyWrongPayloadForBulk,
/// Return an OK reply whose payload arm does not match the requested
/// bulk-write command, so `bulk_write_results` returns `MalformedReply`.
OkReplyWrongPayloadForBulkWrite,
}
#[derive(Clone)]
@@ -453,6 +722,58 @@ impl MxAccessGateway for FakeGateway {
.unwrap_or_default();
*self.state.last_command_kind.lock().await = Some(kind);
if let Some(override_) = self.state.invoke_override.lock().await.clone() {
return match override_ {
InvokeOverride::Unavailable(message) => Err(Status::unavailable(message)),
InvokeOverride::OkReplyNoPayload => Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok but payload omitted")),
payload: None,
..MxCommandReply::default()
})),
InvokeOverride::OkReplyWrongPayloadForReadBulk => {
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("read-bulk wrong payload arm")),
// AddItem payload arm against a ReadBulk request:
// the client's `read_bulk` matcher must reject it.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 0,
})),
..MxCommandReply::default()
}))
}
InvokeOverride::OkReplyWrongPayloadForBulk => Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("bulk wrong payload arm")),
// AddItem payload arm against a SubscribeBulk request.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 0,
})),
..MxCommandReply::default()
})),
InvokeOverride::OkReplyWrongPayloadForBulkWrite => {
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("bulk-write wrong payload arm")),
// AddItem payload arm against a WriteBulk request.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 0,
})),
..MxCommandReply::default()
}))
}
};
}
if kind == MxCommandKind::Write as i32 {
return Ok(Response::new(mxaccess_failure_reply()));
}
@@ -478,36 +799,41 @@ impl MxAccessGateway for FakeGateway {
}));
}
// All four bulk-write families return `BulkWriteReply` over the
// wire and only differ by which `payload` arm carries it. The
// round-trip tests below want one entry per family, so wire them
// all up to the same canned reply (one success + one failure) and
// pick the matching payload arm by kind.
if kind == MxCommandKind::WriteBulk as i32 {
// Echo one success and one failure so the test can assert the per-entry
// shape and verify the call did not throw on per-entry failure.
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::WriteBulk(BulkWriteReply {
results: vec![
BulkWriteResult {
server_handle: 12,
item_handle: 901,
was_successful: true,
hresult: None,
statuses: vec![],
error_message: String::new(),
},
BulkWriteResult {
server_handle: 12,
item_handle: 902,
was_successful: false,
hresult: None,
statuses: vec![],
error_message: "invalid handle".to_owned(),
},
],
})),
..MxCommandReply::default()
}));
mx_command_reply::Payload::WriteBulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::Write2Bulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::Write2Bulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::WriteSecuredBulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::WriteSecuredBulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::WriteSecured2Bulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::WriteSecured2Bulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::ReadBulk as i32 {
@@ -699,6 +1025,44 @@ fn mxaccess_failure_reply() -> MxCommandReply {
}
}
fn canned_bulk_write_reply() -> BulkWriteReply {
BulkWriteReply {
results: vec![
BulkWriteResult {
server_handle: 12,
item_handle: 901,
was_successful: true,
hresult: None,
statuses: vec![],
error_message: String::new(),
},
BulkWriteResult {
server_handle: 12,
item_handle: 902,
was_successful: false,
hresult: None,
statuses: vec![],
error_message: "invalid handle".to_owned(),
},
],
}
}
fn bulk_write_envelope(
session_id: String,
kind: i32,
payload: mx_command_reply::Payload,
) -> MxCommandReply {
MxCommandReply {
session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(payload),
..MxCommandReply::default()
}
}
fn event(sequence: u64) -> MxEvent {
MxEvent {
family: MxEventFamily::OnDataChange as i32,