Resolve Client.Go-002, -003 code-review findings

Client.Go-002: the Events/EventsAfter compatibility path silently dropped
events when the 16-slot results channel filled — it cancelled the stream and
closed the channel with no error delivered. sendEventResult now evicts an
old buffered event and delivers a terminal EventResult carrying the new
exported ErrEventBufferOverflow before close, so the overflow is observable.

Client.Go-003: parseInt32List panicked on a malformed -item-handles token,
crashing the CLI with a stack trace. It now returns an error that
runUnsubscribeBulk propagates, exiting 2 with a clean message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 21:31:36 -04:00
parent 8023eccfa6
commit f88a029ecc
7 changed files with 99 additions and 13 deletions
+7 -1
View File
@@ -79,7 +79,13 @@ client, err := mxgateway.Dial(ctx, mxgateway.Options{
`AddItem`, `AddItem2`, `Advise`, `Write`, `Events`, and `Close`. Prefer
`SubscribeEvents` or `SubscribeEventsAfter` for long-running streams because the
returned subscription owns cancellation and exposes `Close` for deterministic
goroutine cleanup. Raw protobuf messages remain available through the
goroutine cleanup. `Events` and `EventsAfter` are a compatibility shim with a
bounded internal buffer: if the consumer drains too slowly the buffer fills,
the underlying stream is cancelled, and a terminal `EventResult` carrying
`ErrEventBufferOverflow` is delivered as the channel's last item before it
closes — so a slow consumer can distinguish dropped events from a normal
end-of-stream. `SubscribeEvents` blocks instead of dropping, so use it when no
events may be lost. Raw protobuf messages remain available through the
`mxgateway` package aliases and the `Raw` helper methods. Typed errors support
`errors.As` for `GatewayError`, `CommandError`, and `MxAccessError`; command
errors preserve the raw reply.
+9 -4
View File
@@ -331,6 +331,11 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr
return errors.New("session-id and item-handles are required")
}
handles, err := parseInt32List(*itemHandles)
if err != nil {
return err
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
@@ -338,7 +343,7 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr
defer client.Close()
session := mxgateway.NewSessionForID(client, *sessionID)
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), parseInt32List(*itemHandles))
results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles)
return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err)
}
@@ -514,7 +519,7 @@ func parseStringList(value string) []string {
return items
}
func parseInt32List(value string) []int32 {
func parseInt32List(value string) ([]int32, error) {
parts := strings.Split(value, ",")
items := make([]int32, 0, len(parts))
for _, part := range parts {
@@ -524,11 +529,11 @@ func parseInt32List(value string) []int32 {
}
parsed, err := strconv.ParseInt(item, 10, 32)
if err != nil {
panic(err)
return nil, fmt.Errorf("invalid item handle %q: %w", item, err)
}
items = append(items, int32(parsed))
}
return items
return items, nil
}
func bindCommonFlags(flags *flag.FlagSet) *commonOptions {
+29
View File
@@ -56,3 +56,32 @@ func TestParseValueBuildsTypedValue(t *testing.T) {
t.Fatalf("int32 value = %d, want 123", got)
}
}
func TestParseInt32ListParsesValidTokens(t *testing.T) {
items, err := parseInt32List("1, 2 ,3")
if err != nil {
t.Fatalf("parseInt32List() error = %v", err)
}
want := []int32{1, 2, 3}
if len(items) != len(want) {
t.Fatalf("parseInt32List() = %v, want %v", items, want)
}
for i := range want {
if items[i] != want[i] {
t.Fatalf("parseInt32List()[%d] = %d, want %d", i, items[i], want[i])
}
}
}
func TestParseInt32ListReturnsErrorOnMalformedToken(t *testing.T) {
items, err := parseInt32List("1,foo")
if err == nil {
t.Fatalf("parseInt32List() error = nil, want a parse error; items = %v", items)
}
if items != nil {
t.Fatalf("parseInt32List() items = %v, want nil on error", items)
}
if !strings.Contains(err.Error(), "foo") {
t.Fatalf("parseInt32List() error = %q, want it to name the bad token", err.Error())
}
}
+15 -2
View File
@@ -117,7 +117,7 @@ func TestEventsAfterCancelsStreamWhenCompatibilityChannelIsAbandoned(t *testing.
fake := &fakeGatewayServer{
streamStarted: make(chan struct{}),
streamDone: make(chan struct{}),
streamEventCount: 64,
streamEventCount: 256,
}
client, cleanup := newBufconnClient(t, fake)
defer cleanup()
@@ -135,12 +135,25 @@ func TestEventsAfterCancelsStreamWhenCompatibilityChannelIsAbandoned(t *testing.
t.Fatal("compatibility event stream did not stop after result channel filled")
}
// A slow consumer that abandons the buffer must still receive an explicit
// terminal overflow error before the channel closes, so it can tell
// "events dropped" apart from "stream ended normally".
var sawOverflow bool
for {
select {
case _, ok := <-events:
case result, ok := <-events:
if !ok {
if !sawOverflow {
t.Fatal("compatibility event channel closed without an ErrEventBufferOverflow result")
}
return
}
if result.Err != nil {
if !errors.Is(result.Err, ErrEventBufferOverflow) {
t.Fatalf("terminal result error = %v, want ErrEventBufferOverflow", result.Err)
}
sawOverflow = true
}
case <-time.After(2 * time.Second):
t.Fatal("compatibility event channel did not close")
}
+9
View File
@@ -1,11 +1,20 @@
package mxgateway
import (
"errors"
"fmt"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
)
// ErrEventBufferOverflow is the terminal error delivered on the compatibility
// event channel returned by Session.Events / Session.EventsAfter when a slow
// consumer lets the bounded result buffer fill. It signals that the stream was
// cancelled and events were dropped, so a consumer can tell an overflow apart
// from a normal end-of-stream. Use Session.SubscribeEvents to block instead of
// dropping.
var ErrEventBufferOverflow = errors.New("mxgateway: event buffer overflow; compatibility stream cancelled and events dropped")
// GatewayError wraps transport-level gRPC failures.
type GatewayError struct {
// Op names the operation that failed (for example "dial" or "invoke").
+25 -1
View File
@@ -490,7 +490,7 @@ func ensureBulkSize(name string, length int) error {
func sendEventResult(
ctx context.Context,
results chan<- EventResult,
results chan EventResult,
result EventResult,
cancelWhenBufferFull bool,
cancel context.CancelFunc,
@@ -502,7 +502,12 @@ func sendEventResult(
case <-ctx.Done():
return false
default:
// The bounded compatibility buffer is full. Cancel the stream and
// deliver an explicit terminal overflow error so a slow consumer
// can tell dropped events apart from a normal end-of-stream,
// rather than seeing the channel close silently.
cancel()
deliverTerminalResult(results, EventResult{Err: ErrEventBufferOverflow})
return false
}
}
@@ -515,6 +520,25 @@ func sendEventResult(
}
}
// deliverTerminalResult places result on a full buffered channel by evicting
// one of the oldest buffered events to make room. The caller closes results
// afterwards, so the terminal result becomes the consumer's last item.
func deliverTerminalResult(results chan EventResult, result EventResult) {
for {
select {
case results <- result:
return
default:
}
select {
case <-results:
default:
// Another receiver drained the channel between the send and
// receive attempts; retry the send.
}
}
}
func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) {
return s.client.Invoke(ctx, &pb.MxCommandRequest{
SessionId: s.ID(),
+5 -5
View File
@@ -7,7 +7,7 @@
| Review date | 2026-05-18 |
| Commit reviewed | `3cc53a8` |
| Status | Reviewed |
| Open findings | 9 |
| Open findings | 7 |
## Checklist coverage
@@ -48,13 +48,13 @@
| Severity | Medium |
| Category | Error handling & resilience |
| Location | `clients/go/mxgateway/session.go:440-516` |
| Status | Open |
| Status | Resolved |
**Description:** For the `Events`/`EventsAfter` compatibility API (`cancelWhenResultBufferFull == true`), when the 16-slot `results` channel is full `sendEventResult` cancels and returns `false`; the goroutine returns and `close(results)` runs — the consumer sees the channel close with **no `EventResult{Err: ...}` ever delivered**. A slow consumer cannot distinguish "stream ended normally" from "events were silently dropped." This contradicts the design doc's "libraries should not reorder, coalesce, or drop events by default", and a test currently pins this lossy behaviour.
**Recommendation:** Before cancelling on a full buffer, deliver a terminal `EventResult` carrying an explicit error (e.g. `ErrEventBufferOverflow`). Document the behaviour on `Session.Events`; steer callers to `SubscribeEvents` (which blocks instead of dropping).
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed against source — on a full bounded buffer the compatibility path cancelled and closed `results` with no terminal result. Added the exported sentinel `ErrEventBufferOverflow` (`errors.go`); `sendEventResult` now, on a full buffer, cancels the stream then calls the new `deliverTerminalResult` helper, which evicts one of the oldest buffered events to make room and places `EventResult{Err: ErrEventBufferOverflow}` so it becomes the consumer's last item before the channel closes. The previously lossy regression test (`TestEventsAfterCancelsStreamWhenCompatibilityChannelIsAbandoned`) was re-pointed to assert the terminal `ErrEventBufferOverflow` result is delivered. `clients/go/README.md` now documents the bounded-buffer/overflow behaviour and steers no-loss callers to `SubscribeEvents`.
### Client.Go-003
@@ -63,13 +63,13 @@
| Severity | Medium |
| Category | Correctness & logic bugs |
| Location | `clients/go/cmd/mxgw-go/main.go:517-532` |
| Status | Open |
| Status | Resolved |
**Description:** `parseInt32List` calls `panic(err)` when an `item-handles` token fails to parse as an int32. The CLI is a documented user-facing tool; a typo like `-item-handles 1,foo` crashes the process with an unrecovered panic and stack trace instead of returning a clean error and exit code 2 like every other validation path in `main.go`.
**Recommendation:** Change `parseInt32List` to return `([]int32, error)` and have `runUnsubscribeBulk` propagate the error, matching `parseValue`'s pattern.
**Resolution:** _(open)_
**Resolution:** Resolved 2026-05-18: confirmed against source — `parseInt32List` called `panic(err)` on a malformed token. It now returns `([]int32, error)`, wrapping the bad token (`invalid item handle %q: %w`); `runUnsubscribeBulk` parses item handles before dialing and returns the error, so a typo flows through `runWithIO` to `os.Exit(2)` like other validation paths. Regression tests `TestParseInt32ListParsesValidTokens` and `TestParseInt32ListReturnsErrorOnMalformedToken` added to `cmd/mxgw-go/main_test.go`.
### Client.Go-004