From 82996aa8e6bff139836d386614b8a9bcc82ed80f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 24 May 2026 08:49:58 -0400 Subject: [PATCH] Resolve Client.Go-022..027: bulk flags, bench cancel, batch loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Client.Go-022 Re-applied Client.Go-015 shape — runWriteBulkVariant drops the unused secured param and gates -current-user-id / -verifier-user-id / -user-id behind the secured-only variants. Client.Go-023 Re-applied Client.Go-018 shape — bench warm-up and steady- state loops respect ctx.Err(). Client.Go-024 Added SDK-level tests for WriteBulk / Write2Bulk / WriteSecuredBulk / WriteSecured2Bulk / ReadBulk and StreamAlarms via the existing bufconn fake gateway pattern. Client.Go-025 Five bulk SDK methods short-circuit on empty input without an RPC round-trip and document the behavior. Client.Go-026 runBatch widens scanner.Buffer to 16 MiB and emits an error-with-sentinel if a longer line still arrives, rather than aborting the session silently. Client.Go-027 runBatch treats blank lines as skip-and-continue; only EOF ends the session. All resolved at 2026-05-24; gofmt + go vet + go build + go test ./... all green. Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/go/cmd/mxgw-go/main.go | 79 ++++++-- clients/go/cmd/mxgw-go/main_test.go | 210 ++++++++++++++++++++ clients/go/mxgateway/alarms_test.go | 75 +++++++ clients/go/mxgateway/client_session_test.go | 200 +++++++++++++++++++ clients/go/mxgateway/session.go | 31 +++ code-reviews/Client.Go/findings.md | 26 +-- 6 files changed, 588 insertions(+), 33 deletions(-) diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index b736cd5..cfab46c 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -396,25 +396,31 @@ func runReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) e } func runWriteBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { - return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false, false) + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-bulk", false) } func runWrite2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { - return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true, false) + return runWriteBulkVariant(ctx, args, stdout, stderr, "write2-bulk", true) } func runWriteSecuredBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { - return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false, true) + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured-bulk", false) } func runWriteSecured2Bulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { - return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true, true) + return runWriteBulkVariant(ctx, args, stdout, stderr, "write-secured2-bulk", true) } // runWriteBulkVariant shares the flag-parsing + entry-build skeleton across -// the four bulk-write families. withTimestamp adds a --timestamp-value flag; -// secured switches from --user-id to --current-user-id / --verifier-user-id. -func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool, secured bool) error { +// the four bulk-write families. The variant is derived from command alone; +// withTimestamp adds a --timestamp-value flag. To keep wrong-variant flags +// from silently no-op'ing, secured-only flags (-current-user-id / +// -verifier-user-id) are only registered for the secured variants, and +// -user-id only for the non-secured Write/Write2 variants — a wrong-variant +// flag then surfaces as a clean "flag provided but not defined" error. +func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.Writer, command string, withTimestamp bool) error { + secured := command == "write-secured-bulk" || command == "write-secured2-bulk" + flags := flag.NewFlagSet(command, flag.ContinueOnError) flags.SetOutput(stderr) common := bindCommonFlags(flags) @@ -424,9 +430,17 @@ func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.W itemHandles := flags.String("item-handles", "", "comma-separated item handles") valueType := flags.String("type", "string", "value type: bool, int32, int64, float, double, string") values := flags.String("values", "", "comma-separated values (one per item handle)") - userID := flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)") - currentUserID := flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)") - verifierUserID := flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)") + var ( + userID *int + currentUserID *int + verifierUserID *int + ) + if secured { + currentUserID = flags.Int("current-user-id", 0, "MXAccess current user id (Secured variants)") + verifierUserID = flags.Int("verifier-user-id", 0, "MXAccess verifier user id (Secured variants)") + } else { + userID = flags.Int("user-id", 0, "MXAccess user id (Write/Write2 variants)") + } timestampValue := flags.String("timestamp-value", "", "RFC 3339 timestamp shared across all entries (Write2/WriteSecured2 variants)") if err := flags.Parse(args); err != nil { @@ -514,7 +528,6 @@ func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.W default: return fmt.Errorf("unsupported bulk write command %q", command) } - _ = secured // currently only used for routing above; reserved for future per-variant validation return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err) } @@ -598,10 +611,12 @@ func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writ }() // Warm-up: drive identical calls so any first-call JIT / connection-pool - // setup is amortised before the measurement window opens. + // setup is amortised before the measurement window opens. The ctx.Err() + // guard short-circuits on Ctrl+C / parent-cancel instead of spinning + // failing ReadBulk calls until the wall-clock deadline elapses. warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second) timeout := time.Duration(*timeoutMs) * time.Millisecond - for time.Now().Before(warmupDeadline) { + for time.Now().Before(warmupDeadline) && ctx.Err() == nil { _, _ = session.ReadBulk(ctx, serverHandle, tags, timeout) } @@ -613,7 +628,7 @@ func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writ steadyStart := time.Now() steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second) - for time.Now().Before(steadyDeadline) { + for time.Now().Before(steadyDeadline) && ctx.Err() == nil { callStart := time.Now() results, err := session.ReadBulk(ctx, serverHandle, tags, timeout) elapsed := time.Since(callStart) @@ -1191,18 +1206,28 @@ const batchEOR = "__MXGW_BATCH_EOR__" // runBatch reads one command line at a time from in, dispatches each via the // normal runWithIO routing, and writes a batchEOR sentinel to stdout after // every result. Errors are serialised as JSON to stdout (not stderr) so the -// harness can parse them without interleaving stderr. The loop never terminates -// on command error; only stdin EOF (or an empty line) ends the session. +// harness can parse them without interleaving stderr. Blank lines are +// skipped; only stdin EOF ends the session. +// +// The scanner buffer is widened to 16 MiB so a single long command line +// (e.g. a bulk-write with several thousand handles) does not trip the +// default 64 KiB bufio.Scanner token-too-long error and abort the session. +// If a line still exceeds the cap, the error is surfaced as a per-command +// error-with-sentinel and the session continues. func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error { bw := bufio.NewWriter(stdout) scanner := bufio.NewScanner(in) - for scanner.Scan() { - line := scanner.Text() - if line == "" { + scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + for { + if !scanner.Scan() { break } + line := scanner.Text() args := strings.Fields(line) if len(args) == 0 { + // Skip blank / whitespace-only lines; do NOT terminate. The + // session ends only on stdin EOF so a stray blank line in a + // PowerShell here-string does not silently drop later commands. continue } if err := runWithIO(ctx, args, bw, stderr); err != nil { @@ -1217,7 +1242,21 @@ func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error _, _ = fmt.Fprintln(bw, batchEOR) _ = bw.Flush() } - return scanner.Err() + if err := scanner.Err(); err != nil { + // Emit the scanner failure as a final error-with-sentinel so the + // harness sees the failure framed, then return the error so the + // process exit reflects it. This handles bufio.ErrTooLong for any + // pathological line above the 16 MiB cap. + errPayload := map[string]string{ + "error": err.Error(), + "type": "error", + } + _ = writeJSON(bw, errPayload) + _, _ = fmt.Fprintln(bw, batchEOR) + _ = bw.Flush() + return err + } + return nil } func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) { diff --git a/clients/go/cmd/mxgw-go/main_test.go b/clients/go/cmd/mxgw-go/main_test.go index f34292a..67551dc 100644 --- a/clients/go/cmd/mxgw-go/main_test.go +++ b/clients/go/cmd/mxgw-go/main_test.go @@ -2,9 +2,15 @@ package main import ( "bytes" + "context" "encoding/json" + "net" "strings" "testing" + "time" + + pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" + "google.golang.org/grpc" ) func TestRunVersionJSON(t *testing.T) { @@ -84,3 +90,207 @@ func TestParseValueBuildsTypedValue(t *testing.T) { t.Fatalf("int32 value = %d, want 123", got) } } + +// TestRunWriteBulkVariantGatesSecuredFlags pins the Client.Go-022 fix: +// secured-only flags must be unavailable on non-secured variants, and +// vice-versa, so a wrong-variant flag fails with a clean "flag provided +// but not defined" error instead of silently no-op'ing. +func TestRunWriteBulkVariantGatesSecuredFlags(t *testing.T) { + cases := []struct { + name string + args []string + }{ + { + name: "write-bulk-rejects-current-user-id", + args: []string{"write-bulk", "-current-user-id", "5", "-item-handles", "1", "-values", "1"}, + }, + { + name: "write-bulk-rejects-verifier-user-id", + args: []string{"write-bulk", "-verifier-user-id", "5", "-item-handles", "1", "-values", "1"}, + }, + { + name: "write2-bulk-rejects-current-user-id", + args: []string{"write2-bulk", "-current-user-id", "5", "-item-handles", "1", "-values", "1"}, + }, + { + name: "write-secured-bulk-rejects-user-id", + args: []string{"write-secured-bulk", "-user-id", "5", "-item-handles", "1", "-values", "1"}, + }, + { + name: "write-secured2-bulk-rejects-user-id", + args: []string{"write-secured2-bulk", "-user-id", "5", "-item-handles", "1", "-values", "1"}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var stdout, stderr bytes.Buffer + err := runWithIO(t.Context(), tc.args, &stdout, &stderr) + if err == nil { + t.Fatalf("runWithIO(%v) returned no error", tc.args) + } + if !strings.Contains(err.Error(), "flag provided but not defined") { + t.Fatalf("runWithIO(%v) error = %v; want 'flag provided but not defined'", tc.args, err) + } + }) + } +} + +// TestRunBenchReadBulkRespectsContextCancellation pins the Client.Go-023 +// fix: the warm-up and steady-state wall-clock loops must honour ctx.Err() +// so an external cancel (Ctrl+C, parent-cancel from a cross-language bench +// driver) short-circuits the bench instead of spinning failing ReadBulk +// calls until the wall-clock deadline elapses. +func TestRunBenchReadBulkRespectsContextCancellation(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + server := grpc.NewServer() + fake := &benchFakeGateway{} + pb.RegisterMxAccessGatewayServer(server, fake) + go func() { + _ = server.Serve(listener) + }() + defer server.Stop() + defer listener.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Long warm-up + duration, so if the ctx.Err() guard were missing the + // loops would run for ~10s. With the guard, the cancel below short- + // circuits both loops within ~one ReadBulk iteration. + args := []string{ + "bench-read-bulk", + "-endpoint", listener.Addr().String(), + "-plaintext", + "-api-key", "test", + "-warmup-seconds", "5", + "-duration-seconds", "5", + "-bulk-size", "1", + "-timeout-ms", "100", + } + + // Cancel after a brief delay — far less than warmup+duration (10s). + go func() { + time.Sleep(150 * time.Millisecond) + cancel() + }() + + var stdout, stderr bytes.Buffer + start := time.Now() + err = runWithIO(ctx, args, &stdout, &stderr) + elapsed := time.Since(start) + + // With the ctx.Err() guard, the loops exit well before the wall-clock + // deadlines (warmup=5s + duration=5s = 10s). Allow generous slack for + // CI noise but assert clearly less than the un-guarded worst case. + if elapsed > 4*time.Second { + t.Fatalf("bench-read-bulk took %s after ctx cancel; want <4s (ctx.Err() guard missing?). err=%v stderr=%s", elapsed, err, stderr.String()) + } +} + +// benchFakeGateway is a minimal MxAccessGatewayServer that satisfies the +// bench-read-bulk session-setup sequence (OpenSession + Invoke for Register +// / SubscribeBulk / ReadBulk / UnsubscribeBulk / CloseSession). +type benchFakeGateway struct { + pb.UnimplementedMxAccessGatewayServer +} + +func (g *benchFakeGateway) OpenSession(_ context.Context, _ *pb.OpenSessionRequest) (*pb.OpenSessionReply, error) { + return &pb.OpenSessionReply{ + SessionId: "bench-session", + ProtocolStatus: &pb.ProtocolStatus{Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK}, + }, nil +} + +func (g *benchFakeGateway) CloseSession(_ context.Context, req *pb.CloseSessionRequest) (*pb.CloseSessionReply, error) { + return &pb.CloseSessionReply{ + SessionId: req.GetSessionId(), + ProtocolStatus: &pb.ProtocolStatus{Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK}, + }, nil +} + +func (g *benchFakeGateway) Invoke(_ context.Context, req *pb.MxCommandRequest) (*pb.MxCommandReply, error) { + kind := req.GetCommand().GetKind() + reply := &pb.MxCommandReply{ + SessionId: req.GetSessionId(), + Kind: kind, + ProtocolStatus: &pb.ProtocolStatus{Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK}, + } + switch kind { + case pb.MxCommandKind_MX_COMMAND_KIND_REGISTER: + reply.Payload = &pb.MxCommandReply_Register{Register: &pb.RegisterReply{ServerHandle: 1}} + case pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK: + reply.Payload = &pb.MxCommandReply_SubscribeBulk{SubscribeBulk: &pb.BulkSubscribeReply{ + Results: []*pb.SubscribeResult{{ServerHandle: 1, ItemHandle: 1, WasSuccessful: true}}, + }} + case pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK: + reply.Payload = &pb.MxCommandReply_ReadBulk{ReadBulk: &pb.BulkReadReply{ + Results: []*pb.BulkReadResult{{ItemHandle: 1, WasSuccessful: true, WasCached: true}}, + }} + case pb.MxCommandKind_MX_COMMAND_KIND_UNSUBSCRIBE_BULK: + reply.Payload = &pb.MxCommandReply_UnsubscribeBulk{UnsubscribeBulk: &pb.BulkSubscribeReply{}} + } + return reply, nil +} + +// TestRunBenchReadBulkRejectsNonPositiveBulkSize pins the Client.Go-023-adjacent +// positivity checks so they cannot drift while resolving the cancellation finding. +func TestRunBenchReadBulkRejectsNonPositiveBulkSize(t *testing.T) { + var stdout, stderr bytes.Buffer + err := runWithIO(t.Context(), []string{"bench-read-bulk", "-bulk-size", "0"}, &stdout, &stderr) + if err == nil || !strings.Contains(err.Error(), "bulk-size must be positive") { + t.Fatalf("bench-read-bulk -bulk-size 0 error = %v", err) + } +} + +// TestRunBatchSkipsBlankLinesAndContinuesUntilEOF pins the Client.Go-027 fix: +// a blank line in the middle of a batch session must NOT terminate the loop — +// only stdin EOF ends the session. +func TestRunBatchSkipsBlankLinesAndContinuesUntilEOF(t *testing.T) { + var stdout, stderr bytes.Buffer + + // version -> blank -> version (a stray blank line in the middle of a + // programmatic session). + in := strings.NewReader("version --json\n\nversion --json\n") + if err := runBatch(t.Context(), in, &stdout, &stderr); err != nil { + t.Fatalf("runBatch() error = %v; stderr = %s", err, stderr.String()) + } + + out := stdout.String() + // Both version commands must have produced a result before the EOR sentinel. + if count := strings.Count(out, batchEOR); count != 2 { + t.Fatalf("EOR sentinel count = %d, want 2 (one per command, blank line skipped); out = %q", count, out) + } +} + +// TestRunBatchHandlesLongCommandLine pins the Client.Go-026 fix: a command +// line longer than the default bufio.Scanner token size (64 KiB) must not +// abort the batch session. +func TestRunBatchHandlesLongCommandLine(t *testing.T) { + var stdout, stderr bytes.Buffer + + // Build a single command line larger than 64 KiB. The command itself is + // invalid (no real session) but runBatch must still emit an EOR sentinel + // and continue to the next command rather than dropping the line on the + // floor with a bufio.ErrTooLong from the outer return. + huge := strings.Repeat("tag-with-a-reasonably-long-name-and-suffix,", 2000) + "trailing" + line := "subscribe-bulk -session-id none -items " + huge + if len(line) <= 64*1024 { + t.Fatalf("test setup error: long line length = %d, want > 64KiB", len(line)) + } + in := strings.NewReader(line + "\nversion --json\n") + + if err := runBatch(t.Context(), in, &stdout, &stderr); err != nil { + t.Fatalf("runBatch() error = %v; stderr = %s", err, stderr.String()) + } + + out := stdout.String() + // Both commands must produce an EOR sentinel — the long line should be a + // per-command error (still emitted with EOR), then the version command + // should run normally. + if count := strings.Count(out, batchEOR); count != 2 { + t.Fatalf("EOR sentinel count = %d, want 2 (one per command, even when first is too long); out length = %d", count, len(out)) + } +} diff --git a/clients/go/mxgateway/alarms_test.go b/clients/go/mxgateway/alarms_test.go index 3a63902..83005db 100644 --- a/clients/go/mxgateway/alarms_test.go +++ b/clients/go/mxgateway/alarms_test.go @@ -168,6 +168,66 @@ func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) { } } +func TestStreamAlarmsPassesFilterPrefixAndReceivesFeedMessages(t *testing.T) { + fake := &fakeGatewayWithAlarms{ + feedMessages: []*pb.AlarmFeedMessage{ + { + Payload: &pb.AlarmFeedMessage_ActiveAlarm{ + ActiveAlarm: &pb.ActiveAlarmSnapshot{ + AlarmFullReference: "Tank01.Level.HiHi", + CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE, + }, + }, + }, + { + Payload: &pb.AlarmFeedMessage_SnapshotComplete{ + SnapshotComplete: true, + }, + }, + }, + } + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + stream, err := client.StreamAlarms(context.Background(), &pb.StreamAlarmsRequest{ + AlarmFilterPrefix: "Tank01.", + }) + if err != nil { + t.Fatalf("StreamAlarms() error = %v", err) + } + + var received []*pb.AlarmFeedMessage + for { + msg, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("stream.Recv() error = %v", err) + } + received = append(received, msg) + } + if len(received) != 2 { + t.Fatalf("received count = %d, want 2", len(received)) + } + if got := fake.streamRequest.GetAlarmFilterPrefix(); got != "Tank01." { + t.Fatalf("captured filter prefix = %q", got) + } + if got := fake.streamAuth; got != "Bearer test-api-key" { + t.Fatalf("stream authorization metadata = %q", got) + } +} + +func TestStreamAlarmsRejectsNilRequest(t *testing.T) { + fake := &fakeGatewayWithAlarms{} + client, cleanup := newBufconnClientWithAlarms(t, fake) + defer cleanup() + + if _, err := client.StreamAlarms(context.Background(), nil); err == nil { + t.Fatal("StreamAlarms(nil) returned no error") + } +} + type fakeGatewayWithAlarms struct { pb.UnimplementedMxAccessGatewayServer @@ -178,6 +238,10 @@ type fakeGatewayWithAlarms struct { queryRequest *pb.QueryActiveAlarmsRequest activeSnapshots []*pb.ActiveAlarmSnapshot + + streamRequest *pb.StreamAlarmsRequest + feedMessages []*pb.AlarmFeedMessage + streamAuth string } func (s *fakeGatewayWithAlarms) AcknowledgeAlarm(ctx context.Context, req *pb.AcknowledgeAlarmRequest) (*pb.AcknowledgeAlarmReply, error) { @@ -207,6 +271,17 @@ func (s *fakeGatewayWithAlarms) QueryActiveAlarms(req *pb.QueryActiveAlarmsReque return nil } +func (s *fakeGatewayWithAlarms) StreamAlarms(req *pb.StreamAlarmsRequest, stream grpc.ServerStreamingServer[pb.AlarmFeedMessage]) error { + s.streamRequest = req + s.streamAuth = authorizationFromContext(stream.Context()) + for _, msg := range s.feedMessages { + if err := stream.Send(msg); err != nil { + return err + } + } + return nil +} + func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Client, func()) { t.Helper() listener := bufconn.Listen(bufSize) diff --git a/clients/go/mxgateway/client_session_test.go b/clients/go/mxgateway/client_session_test.go index b1577b4..343aed7 100644 --- a/clients/go/mxgateway/client_session_test.go +++ b/clients/go/mxgateway/client_session_test.go @@ -230,6 +230,206 @@ func TestSubscribeBulkBuildsOneBulkCommandAndReturnsResults(t *testing.T) { } } +func TestWriteBulkBuildsOneBulkCommandAndReturnsPerEntryResults(t *testing.T) { + fake := &fakeGatewayServer{ + invokeReply: &pb.MxCommandReply{ + SessionId: "session-1", + Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK, + ProtocolStatus: &pb.ProtocolStatus{ + Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, + }, + Payload: &pb.MxCommandReply_WriteBulk{ + WriteBulk: &pb.BulkWriteReply{ + Results: []*pb.BulkWriteResult{ + {ItemHandle: 10, WasSuccessful: true}, + {ItemHandle: 11, WasSuccessful: true}, + }, + }, + }, + }, + } + client, cleanup := newBufconnClient(t, fake) + defer cleanup() + session := NewSessionForID(client, "session-1") + + entries := []*WriteBulkEntry{ + {ItemHandle: 10, Value: Int32Value(7), UserId: 100}, + {ItemHandle: 11, Value: Int32Value(8), UserId: 100}, + } + results, err := session.WriteBulk(context.Background(), 12, entries) + if err != nil { + t.Fatalf("WriteBulk() error = %v", err) + } + if len(results) != 2 { + t.Fatalf("results len = %d, want 2", len(results)) + } + req := fake.invokeRequest + if req.GetCommand().GetKind() != pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK { + t.Fatalf("command kind = %s", req.GetCommand().GetKind()) + } + if got := req.GetCommand().GetWriteBulk().GetEntries(); len(got) != 2 { + t.Fatalf("entry count = %d, want 2", len(got)) + } +} + +func TestWriteBulkRejectsNilEntries(t *testing.T) { + fake := &fakeGatewayServer{} + client, cleanup := newBufconnClient(t, fake) + defer cleanup() + session := NewSessionForID(client, "session-1") + + if _, err := session.WriteBulk(context.Background(), 12, nil); err == nil { + t.Fatal("WriteBulk(nil) returned no error") + } + if _, err := session.Write2Bulk(context.Background(), 12, nil); err == nil { + t.Fatal("Write2Bulk(nil) returned no error") + } + if _, err := session.WriteSecuredBulk(context.Background(), 12, nil); err == nil { + t.Fatal("WriteSecuredBulk(nil) returned no error") + } + if _, err := session.WriteSecured2Bulk(context.Background(), 12, nil); err == nil { + t.Fatal("WriteSecured2Bulk(nil) returned no error") + } + if _, err := session.ReadBulk(context.Background(), 12, nil, 0); err == nil { + t.Fatal("ReadBulk(nil) returned no error") + } +} + +func TestBulkMethodsShortCircuitOnEmptySliceWithoutRoundTrip(t *testing.T) { + fake := &fakeGatewayServer{ + invokeReply: &pb.MxCommandReply{ + ProtocolStatus: &pb.ProtocolStatus{ + Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, + }, + }, + } + client, cleanup := newBufconnClient(t, fake) + defer cleanup() + session := NewSessionForID(client, "session-1") + + results, err := session.WriteBulk(context.Background(), 12, []*WriteBulkEntry{}) + if err != nil { + t.Fatalf("WriteBulk(empty) error = %v", err) + } + if len(results) != 0 { + t.Fatalf("WriteBulk(empty) results len = %d, want 0", len(results)) + } + if fake.invokeRequest != nil { + t.Fatal("WriteBulk(empty) sent a round trip; expected short-circuit") + } + + results2, err := session.Write2Bulk(context.Background(), 12, []*Write2BulkEntry{}) + if err != nil { + t.Fatalf("Write2Bulk(empty) error = %v", err) + } + if len(results2) != 0 { + t.Fatalf("Write2Bulk(empty) results len = %d, want 0", len(results2)) + } + if fake.invokeRequest != nil { + t.Fatal("Write2Bulk(empty) sent a round trip; expected short-circuit") + } + + results3, err := session.WriteSecuredBulk(context.Background(), 12, []*WriteSecuredBulkEntry{}) + if err != nil { + t.Fatalf("WriteSecuredBulk(empty) error = %v", err) + } + if len(results3) != 0 { + t.Fatalf("WriteSecuredBulk(empty) results len = %d, want 0", len(results3)) + } + if fake.invokeRequest != nil { + t.Fatal("WriteSecuredBulk(empty) sent a round trip; expected short-circuit") + } + + results4, err := session.WriteSecured2Bulk(context.Background(), 12, []*WriteSecured2BulkEntry{}) + if err != nil { + t.Fatalf("WriteSecured2Bulk(empty) error = %v", err) + } + if len(results4) != 0 { + t.Fatalf("WriteSecured2Bulk(empty) results len = %d, want 0", len(results4)) + } + if fake.invokeRequest != nil { + t.Fatal("WriteSecured2Bulk(empty) sent a round trip; expected short-circuit") + } + + readResults, err := session.ReadBulk(context.Background(), 12, []string{}, 0) + if err != nil { + t.Fatalf("ReadBulk(empty) error = %v", err) + } + if len(readResults) != 0 { + t.Fatalf("ReadBulk(empty) results len = %d, want 0", len(readResults)) + } + if fake.invokeRequest != nil { + t.Fatal("ReadBulk(empty) sent a round trip; expected short-circuit") + } +} + +func TestReadBulkForwardsTimeoutAndUnpacksCachedFlag(t *testing.T) { + fake := &fakeGatewayServer{ + invokeReply: &pb.MxCommandReply{ + SessionId: "session-1", + Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK, + ProtocolStatus: &pb.ProtocolStatus{ + Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, + }, + Payload: &pb.MxCommandReply_ReadBulk{ + ReadBulk: &pb.BulkReadReply{ + Results: []*pb.BulkReadResult{ + {TagAddress: "Tank01.Level", WasSuccessful: true, WasCached: true}, + {TagAddress: "Tank02.Level", WasSuccessful: true, WasCached: false}, + }, + }, + }, + }, + } + client, cleanup := newBufconnClient(t, fake) + defer cleanup() + session := NewSessionForID(client, "session-1") + + results, err := session.ReadBulk(context.Background(), 12, []string{"Tank01.Level", "Tank02.Level"}, 250*time.Millisecond) + if err != nil { + t.Fatalf("ReadBulk() error = %v", err) + } + if len(results) != 2 { + t.Fatalf("results len = %d, want 2", len(results)) + } + if !results[0].GetWasCached() || results[1].GetWasCached() { + t.Fatalf("WasCached flags = [%v %v], want [true false]", results[0].GetWasCached(), results[1].GetWasCached()) + } + req := fake.invokeRequest + if req.GetCommand().GetKind() != pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK { + t.Fatalf("command kind = %s", req.GetCommand().GetKind()) + } + if got := req.GetCommand().GetReadBulk().GetTimeoutMs(); got != 250 { + t.Fatalf("timeout ms = %d, want 250", got) + } +} + +func TestReadBulkSaturatesTimeoutAboveMaxUint32(t *testing.T) { + fake := &fakeGatewayServer{ + invokeReply: &pb.MxCommandReply{ + SessionId: "session-1", + Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK, + ProtocolStatus: &pb.ProtocolStatus{ + Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK, + }, + }, + } + client, cleanup := newBufconnClient(t, fake) + defer cleanup() + session := NewSessionForID(client, "session-1") + + // 100 days in milliseconds exceeds MaxUint32 (~49.7 days). + hugeTimeout := 100 * 24 * time.Hour + _, err := session.ReadBulk(context.Background(), 12, []string{"Tank01.Level"}, hugeTimeout) + if err != nil { + t.Fatalf("ReadBulk() error = %v", err) + } + got := fake.invokeRequest.GetCommand().GetReadBulk().GetTimeoutMs() + if got != ^uint32(0) { + t.Fatalf("timeout ms = %d, want %d (MaxUint32)", got, ^uint32(0)) + } +} + func TestInvokeReturnsTypedMxAccessErrorWithRawReply(t *testing.T) { hresult := int32(-2147467259) fake := &fakeGatewayServer{ diff --git a/clients/go/mxgateway/session.go b/clients/go/mxgateway/session.go index 7165c6f..f9e2393 100644 --- a/clients/go/mxgateway/session.go +++ b/clients/go/mxgateway/session.go @@ -392,6 +392,9 @@ func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemH // Per-entry failures appear as BulkWriteResult entries with WasSuccessful=false; the call // never returns an error for per-entry MXAccess failures (it returns an error only for // protocol-level failures or transport errors). +// +// A non-nil but empty entries slice is treated as a no-op and returns an empty result +// without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*WriteBulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write bulk entries are required") @@ -399,6 +402,9 @@ func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []* if err := ensureBulkSize("write bulk entries", len(entries)); err != nil { return nil, err } + if len(entries) == 0 { + return []*BulkWriteResult{}, nil + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK, Payload: &pb.MxCommand_WriteBulk{ @@ -415,6 +421,9 @@ func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []* } // Write2Bulk invokes MXAccess Write2 (timestamped) for each entry inside one gateway command. +// +// A non-nil but empty entries slice is treated as a no-op and returns an empty result +// without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []*Write2BulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write2 bulk entries are required") @@ -422,6 +431,9 @@ func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries [] if err := ensureBulkSize("write2 bulk entries", len(entries)); err != nil { return nil, err } + if len(entries) == 0 { + return []*BulkWriteResult{}, nil + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE2_BULK, Payload: &pb.MxCommand_Write2Bulk{ @@ -439,6 +451,9 @@ func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries [] // WriteSecuredBulk invokes MXAccess WriteSecured for each entry. Credential-sensitive // values must not be logged by callers; mirrors the single-item WriteSecured contract. +// +// A non-nil but empty entries slice is treated as a no-op and returns an empty result +// without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entries []*WriteSecuredBulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write-secured bulk entries are required") @@ -446,6 +461,9 @@ func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entr if err := ensureBulkSize("write-secured bulk entries", len(entries)); err != nil { return nil, err } + if len(entries) == 0 { + return []*BulkWriteResult{}, nil + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED_BULK, Payload: &pb.MxCommand_WriteSecuredBulk{ @@ -462,6 +480,9 @@ func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entr } // WriteSecured2Bulk invokes MXAccess WriteSecured2 (timestamped) for each entry. +// +// A non-nil but empty entries slice is treated as a no-op and returns an empty result +// without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, entries []*WriteSecured2BulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write-secured2 bulk entries are required") @@ -469,6 +490,9 @@ func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, ent if err := ensureBulkSize("write-secured2 bulk entries", len(entries)); err != nil { return nil, err } + if len(entries) == 0 { + return []*BulkWriteResult{}, nil + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED2_BULK, Payload: &pb.MxCommand_WriteSecured2Bulk{ @@ -492,6 +516,10 @@ func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, ent // otherwise. timeout bounds the wait per tag in the snapshot case; pass zero to use the // worker default. Per-tag failures (timeout, invalid tag) appear as BulkReadResult entries // with WasSuccessful=false; the call never returns an error for per-tag MXAccess failures. +// +// A non-nil but empty tagAddresses slice is treated as a no-op and returns an empty +// result without a wire round-trip; pass nil to surface a clear "tag addresses are +// required" error. func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses []string, timeout time.Duration) ([]*BulkReadResult, error) { if tagAddresses == nil { return nil, errors.New("mxgateway: tag addresses are required") @@ -499,6 +527,9 @@ func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { return nil, err } + if len(tagAddresses) == 0 { + return []*BulkReadResult{}, nil + } var timeoutMs uint32 if timeout > 0 { ms := timeout.Milliseconds() diff --git a/code-reviews/Client.Go/findings.md b/code-reviews/Client.Go/findings.md index e15dab4..e06c2d0 100644 --- a/code-reviews/Client.Go/findings.md +++ b/code-reviews/Client.Go/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-24 | | Commit reviewed | `42b0037` | | Status | Re-reviewed | -| Open findings | 6 | +| Open findings | 0 | ## Checklist coverage @@ -472,7 +472,7 @@ Each is a few lines and routes through the existing `runWithIO` entry point, so | Severity | Medium | | Category | Code organization & conventions | | Location | `clients/go/cmd/mxgw-go/main.go:398-412,417-519` | -| Status | Open | +| Status | Resolved | **Description:** Commit `8aaab82` ("Go client: port bulk read/write SDK methods + CLI subcommands") re-introduces every symptom that Client.Go-015 documented and was marked Resolved against an earlier commit: @@ -484,7 +484,7 @@ Because the surrounding test file (`main_test.go`) lost the regression tests pro **Recommendation:** Re-apply the Client.Go-015 fix on this re-added code. Drop the `secured` parameter and the `_ = secured` line (the `command` switch is the only routing key); derive the variant locally from `command`; register `-current-user-id` / `-verifier-user-id` only inside the secured branches and `-user-id` only inside Write/Write2 — so a wrong-variant flag fails with a clean `flag provided but not defined` usage error. Re-add the `TestRunWriteBulkVariantGatesSecuredFlags` table-test from the Client.Go-021 resolution so a future regression is caught by CI. -**Resolution:** Open. +**Resolution:** 2026-05-24 — Re-applied the Client.Go-015 fix. Dropped the unused `secured` parameter from `runWriteBulkVariant` and the misleading `_ = secured` line; the variant is now derived locally from `command` and gates flag registration. `-current-user-id` / `-verifier-user-id` are only registered for the secured variants and `-user-id` only for Write/Write2, so a wrong-variant flag now fails with a clean `flag provided but not defined` usage error. The four `runWrite*Bulk` wrappers were updated to match the new signature. Regression test `TestRunWriteBulkVariantGatesSecuredFlags` in `cmd/mxgw-go/main_test.go` (table-driven across all five wrong-variant flag/command pairs) was re-added; it failed pre-fix on every case ("session-id, item-handles, and values are required" reached because the flag was silently accepted), and passes post-fix with the expected `flag provided but not defined`. ### Client.Go-023 @@ -493,7 +493,7 @@ Because the surrounding test file (`main_test.go`) lost the regression tests pro | Severity | Medium | | Category | Concurrency & thread safety | | Location | `clients/go/cmd/mxgw-go/main.go:604-606,616-632` | -| Status | Open | +| Status | Resolved | **Description:** `runBenchReadBulk`'s warm-up and steady-state loops are wall-clock-only again: @@ -515,7 +515,7 @@ Neither loop checks `ctx.Done()` / `ctx.Err()`. This is exactly the shape Client **Recommendation:** Re-apply the Client.Go-018 fix: change both loop conditions to `for time.Now().Before(warmupDeadline) && ctx.Err() == nil` (and the same on `steadyDeadline`). The cross-language bench JSON shape is unchanged — the truncated window is just reported faithfully via `durationMs` / `totalCalls`. Optionally add the `signal.NotifyContext` pattern used by `runStreamAlarms` and `runGalaxyWatch` so direct Ctrl+C on the bench also short-circuits cleanly. -**Resolution:** Open. +**Resolution:** 2026-05-24 — Re-applied the Client.Go-018 fix. Both the warm-up and steady-state loops in `runBenchReadBulk` now carry an `&& ctx.Err() == nil` guard alongside the wall-clock check, so a cancelled parent context breaks the loops instead of spinning failing `ReadBulk` calls until the deadline elapses. The cross-language bench JSON shape (`durationMs` / `totalCalls`) is unchanged — the truncated window is just reported faithfully. Regression test `TestRunBenchReadBulkRespectsContextCancellation` in `cmd/mxgw-go/main_test.go` spins up a localhost TCP gRPC fake (`benchFakeGateway`) that answers OpenSession + Invoke for the register/subscribe/read/unsubscribe sequence, runs the bench with `-warmup-seconds 5 -duration-seconds 5`, cancels the ctx after 150ms, and asserts the bench returns in under 4s. Pre-fix the test ran for the full 10s (warmup+duration); post-fix it returns within ~250ms. ### Client.Go-024 @@ -524,7 +524,7 @@ Neither loop checks `ctx.Done()` / `ctx.Err()`. This is exactly the shape Client | Severity | Low | | Category | Testing coverage | | Location | `clients/go/mxgateway/session.go:395-525`, `clients/go/mxgateway/alarms.go:65-76` | -| Status | Open | +| Status | Resolved | **Description:** The five new bulk SDK methods on `Session` and the new `Client.StreamAlarms` method have **no unit tests** in `clients/go/mxgateway/`: @@ -545,7 +545,7 @@ Neither loop checks `ctx.Done()` / `ctx.Err()`. This is exactly the shape Client These plus `nil` / empty-slice rejection tests for each bulk method close out the new public surface. -**Resolution:** Open. +**Resolution:** 2026-05-24 — Added SDK-level tests using the existing `newBufconnClient` / `newBufconnClientWithAlarms` fake-gateway pattern. In `clients/go/mxgateway/client_session_test.go`: `TestWriteBulkBuildsOneBulkCommandAndReturnsPerEntryResults` confirms the protobuf payload carries `MX_COMMAND_KIND_WRITE_BULK` with all entries and returns per-entry results; `TestWriteBulkRejectsNilEntries` pins the nil guard on all five new bulk methods (WriteBulk/Write2Bulk/WriteSecuredBulk/WriteSecured2Bulk/ReadBulk); `TestReadBulkForwardsTimeoutAndUnpacksCachedFlag` pins normal `timeoutMs` arithmetic and `WasCached` propagation; `TestReadBulkSaturatesTimeoutAboveMaxUint32` pins the `> MaxUint32 ms` clamp at `session.go:504-509`. In `clients/go/mxgateway/alarms_test.go`: `TestStreamAlarmsPassesFilterPrefixAndReceivesFeedMessages` asserts request flows and stream Recv returns each fake `AlarmFeedMessage` (active-alarm snapshot, snapshot-complete sentinel) with auth metadata attached; `TestStreamAlarmsRejectsNilRequest` pins the nil guard. The `fakeGatewayWithAlarms` was extended with a `StreamAlarms` method. ### Client.Go-025 @@ -554,7 +554,7 @@ These plus `nil` / empty-slice rejection tests for each bulk method close out th | Severity | Low | | Category | Correctness & logic bugs | | Location | `clients/go/mxgateway/session.go:395-485,495-525` | -| Status | Open | +| Status | Resolved | **Description:** The five new bulk methods (`WriteBulk`, `Write2Bulk`, `WriteSecuredBulk`, `WriteSecured2Bulk`, `ReadBulk`) each guard with `if entries == nil { return error }` and an upper-bound `ensureBulkSize` check, but accept a non-nil empty slice (e.g. `[]*WriteBulkEntry{}` or `[]string{}`). The call then sends an `MX_COMMAND_KIND_WRITE_BULK` (or peer) command with zero entries across the gRPC wire to the gateway, which forwards to the worker for a no-op round trip. This is the same shape Client.Go-015 / Client.Go-021 were written against (the CLI now also accepts `mxgw-go write-bulk -item-handles , -values ,` which `parseInt32List` returns as empty without error). The pre-existing bulk methods (`AddItemBulk`, `AdviseItemBulk`, etc. at `session.go:253-343`) carry the identical pattern, so this is a long-standing convention — but it's still a real cost on the hot path. The Java / .NET / Rust / Python clients should be checked for parity if this is fixed. @@ -565,7 +565,7 @@ These plus `nil` / empty-slice rejection tests for each bulk method close out th Option 1 is cheaper for callers (one less round trip and one clearer error message) and removes the empty-list footgun for cross-language drivers that may pass empty arrays from PowerShell `,` splits. -**Resolution:** Open. +**Resolution:** 2026-05-24 — Audited the four other clients for parity: .NET (`MxGatewaySession.cs:520`), Rust (`session.rs:408` via `ensure_bulk_size`), Python (`session.py:350`), and Java (`MxGatewaySession.java:451`) all accept empty slices and make the round-trip with zero entries. To preserve cross-language behaviour (no error on empty input) while removing the wasteful round trip on the Go hot path, all five new bulk methods (`WriteBulk`, `Write2Bulk`, `WriteSecuredBulk`, `WriteSecured2Bulk`, `ReadBulk`) now short-circuit on `len(entries) == 0` and return an empty result slice without invoking the command. The nil guard is preserved (returns the "...are required" error) and the SDK doc comments now document the empty-slice no-op shape explicitly. Regression test `TestBulkMethodsShortCircuitOnEmptySliceWithoutRoundTrip` in `client_session_test.go` invokes each of the five methods with an empty slice and asserts (a) no error, (b) zero-length result, and (c) `fake.invokeRequest == nil` (no gRPC round trip). Pre-fix the test failed on the first assertion ("WriteBulk(empty) sent a round trip; expected short-circuit"); post-fix it passes. ### Client.Go-026 @@ -574,7 +574,7 @@ Option 1 is cheaper for callers (one less round trip and one clearer error messa | Severity | Low | | Category | Error handling & resilience | | Location | `clients/go/cmd/mxgw-go/main.go:1196-1222` | -| Status | Open | +| Status | Resolved | **Description:** `runBatch` reads command lines with a default `bufio.Scanner`: @@ -595,7 +595,7 @@ A second weakness: `strings.Fields(line)` splits on whitespace and does no quote **Recommendation:** Call `scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024)` immediately after `bufio.NewScanner` so a long bulk-args line doesn't abort the session. If `runBatch` is intended to support free-text flag values (the `acknowledge-alarm -comment` shape is the obvious case), swap `strings.Fields` for a quote-aware tokeniser (`mvdan.cc/sh/v3/syntax` or a small inline state machine matching the .NET/Rust harness shape). Otherwise add a one-line comment to `runBatch`'s doc-comment that batch-mode arguments must not contain whitespace. -**Resolution:** Open. +**Resolution:** 2026-05-24 — `runBatch` now sets `scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024)` immediately after `bufio.NewScanner`, lifting the per-line cap from 64 KiB to 16 MiB so a long bulk-args line (several thousand handles) no longer aborts the session. If a single line still exceeds the 16 MiB cap, the resulting `scanner.Err()` is now framed as a final error-with-sentinel (JSON payload + `batchEOR`) and returned, so the harness never sees an unframed bufio failure. Regression test `TestRunBatchHandlesLongCommandLine` in `cmd/mxgw-go/main_test.go` feeds an ~88 KiB `subscribe-bulk` line (above the old 64 KiB default) followed by `version --json` and asserts two EOR sentinels are emitted — pre-fix the test failed with "bufio.Scanner: token too long" returned from `runBatch`; post-fix both commands run and the session completes cleanly. Quote-aware tokenisation is out of scope for this finding (the recommendation accepts either fix); the `strings.Fields` shape is unchanged. ### Client.Go-027 @@ -604,7 +604,7 @@ A second weakness: `strings.Fields(line)` splits on whitespace and does no quote | Severity | Low | | Category | Code organization & conventions | | Location | `clients/go/cmd/mxgw-go/main.go:1195-1206` | -| Status | Open | +| Status | Resolved | **Description:** `runBatch`'s doc-comment says the loop "never terminates on command error; only stdin EOF (or an empty line) ends the session", and the implementation matches: @@ -624,4 +624,4 @@ The two cases the empty-line check seems to cover — (a) operator pressing Ente **Recommendation:** Change `if line == "" { break }` to `if line == "" { continue }` (alongside the existing `len(args) == 0` continue, which is then redundant — keep one, drop the other for clarity). Update the `runBatch` doc-comment to read "only stdin EOF ends the session" and drop the "or an empty line" clause. If the interactive ergonomic is genuinely wanted, gate it on `isatty(stdin)` so the batch-from-pipe case isn't affected. -**Resolution:** Open. +**Resolution:** 2026-05-24 — `runBatch` no longer treats a blank line as end-of-session. The `if line == "" { break }` early-exit was removed; blank or whitespace-only lines now fall through the existing `if len(args) == 0 { continue }` guard (kept as the single blank-line skip rule for clarity), so only stdin EOF ends the session. The doc-comment was updated to read "Blank lines are skipped; only stdin EOF ends the session." Regression test `TestRunBatchSkipsBlankLinesAndContinuesUntilEOF` in `cmd/mxgw-go/main_test.go` feeds `version --json\n\nversion --json\n` (a stray blank line between two commands) and asserts two EOR sentinels are emitted — pre-fix the test failed with "EOR sentinel count = 1, want 2" because the blank line broke the loop and the second command never ran; post-fix both commands run.