diff --git a/clients/go/README.md b/clients/go/README.md index 1516e45..497409a 100644 --- a/clients/go/README.md +++ b/clients/go/README.md @@ -79,7 +79,13 @@ client, err := mxgateway.Dial(ctx, mxgateway.Options{ `AddItem`, `AddItem2`, `Advise`, `Write`, `Events`, and `Close`. Prefer `SubscribeEvents` or `SubscribeEventsAfter` for long-running streams because the returned subscription owns cancellation and exposes `Close` for deterministic -goroutine cleanup. Raw protobuf messages remain available through the +goroutine cleanup. `Events` and `EventsAfter` are a compatibility shim with a +bounded internal buffer: if the consumer drains too slowly the buffer fills, +the underlying stream is cancelled, and a terminal `EventResult` carrying +`ErrEventBufferOverflow` is delivered as the channel's last item before it +closes — so a slow consumer can distinguish dropped events from a normal +end-of-stream. `SubscribeEvents` blocks instead of dropping, so use it when no +events may be lost. Raw protobuf messages remain available through the `mxgateway` package aliases and the `Raw` helper methods. Typed errors support `errors.As` for `GatewayError`, `CommandError`, and `MxAccessError`; command errors preserve the raw reply. diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index 0fff337..4ca977a 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -331,6 +331,11 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr return errors.New("session-id and item-handles are required") } + handles, err := parseInt32List(*itemHandles) + if err != nil { + return err + } + client, options, err := dialForCommand(ctx, common) if err != nil { return err @@ -338,7 +343,7 @@ func runUnsubscribeBulk(ctx context.Context, args []string, stdout, stderr io.Wr defer client.Close() session := mxgateway.NewSessionForID(client, *sessionID) - results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), parseInt32List(*itemHandles)) + results, err := session.UnsubscribeBulk(ctx, int32(*serverHandle), handles) return writeBulkOutput(stdout, *jsonOutput, "unsubscribe-bulk", options, results, err) } @@ -514,7 +519,7 @@ func parseStringList(value string) []string { return items } -func parseInt32List(value string) []int32 { +func parseInt32List(value string) ([]int32, error) { parts := strings.Split(value, ",") items := make([]int32, 0, len(parts)) for _, part := range parts { @@ -524,11 +529,11 @@ func parseInt32List(value string) []int32 { } parsed, err := strconv.ParseInt(item, 10, 32) if err != nil { - panic(err) + return nil, fmt.Errorf("invalid item handle %q: %w", item, err) } items = append(items, int32(parsed)) } - return items + return items, nil } func bindCommonFlags(flags *flag.FlagSet) *commonOptions { diff --git a/clients/go/cmd/mxgw-go/main_test.go b/clients/go/cmd/mxgw-go/main_test.go index 945cf09..f5f5604 100644 --- a/clients/go/cmd/mxgw-go/main_test.go +++ b/clients/go/cmd/mxgw-go/main_test.go @@ -56,3 +56,32 @@ func TestParseValueBuildsTypedValue(t *testing.T) { t.Fatalf("int32 value = %d, want 123", got) } } + +func TestParseInt32ListParsesValidTokens(t *testing.T) { + items, err := parseInt32List("1, 2 ,3") + if err != nil { + t.Fatalf("parseInt32List() error = %v", err) + } + want := []int32{1, 2, 3} + if len(items) != len(want) { + t.Fatalf("parseInt32List() = %v, want %v", items, want) + } + for i := range want { + if items[i] != want[i] { + t.Fatalf("parseInt32List()[%d] = %d, want %d", i, items[i], want[i]) + } + } +} + +func TestParseInt32ListReturnsErrorOnMalformedToken(t *testing.T) { + items, err := parseInt32List("1,foo") + if err == nil { + t.Fatalf("parseInt32List() error = nil, want a parse error; items = %v", items) + } + if items != nil { + t.Fatalf("parseInt32List() items = %v, want nil on error", items) + } + if !strings.Contains(err.Error(), "foo") { + t.Fatalf("parseInt32List() error = %q, want it to name the bad token", err.Error()) + } +} diff --git a/clients/go/mxgateway/client_session_test.go b/clients/go/mxgateway/client_session_test.go index b1577b4..17c3ff5 100644 --- a/clients/go/mxgateway/client_session_test.go +++ b/clients/go/mxgateway/client_session_test.go @@ -117,7 +117,7 @@ func TestEventsAfterCancelsStreamWhenCompatibilityChannelIsAbandoned(t *testing. fake := &fakeGatewayServer{ streamStarted: make(chan struct{}), streamDone: make(chan struct{}), - streamEventCount: 64, + streamEventCount: 256, } client, cleanup := newBufconnClient(t, fake) defer cleanup() @@ -135,12 +135,25 @@ func TestEventsAfterCancelsStreamWhenCompatibilityChannelIsAbandoned(t *testing. t.Fatal("compatibility event stream did not stop after result channel filled") } + // A slow consumer that abandons the buffer must still receive an explicit + // terminal overflow error before the channel closes, so it can tell + // "events dropped" apart from "stream ended normally". + var sawOverflow bool for { select { - case _, ok := <-events: + case result, ok := <-events: if !ok { + if !sawOverflow { + t.Fatal("compatibility event channel closed without an ErrEventBufferOverflow result") + } return } + if result.Err != nil { + if !errors.Is(result.Err, ErrEventBufferOverflow) { + t.Fatalf("terminal result error = %v, want ErrEventBufferOverflow", result.Err) + } + sawOverflow = true + } case <-time.After(2 * time.Second): t.Fatal("compatibility event channel did not close") } diff --git a/clients/go/mxgateway/errors.go b/clients/go/mxgateway/errors.go index 45d114c..4ef4f7e 100644 --- a/clients/go/mxgateway/errors.go +++ b/clients/go/mxgateway/errors.go @@ -1,11 +1,20 @@ package mxgateway import ( + "errors" "fmt" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" ) +// ErrEventBufferOverflow is the terminal error delivered on the compatibility +// event channel returned by Session.Events / Session.EventsAfter when a slow +// consumer lets the bounded result buffer fill. It signals that the stream was +// cancelled and events were dropped, so a consumer can tell an overflow apart +// from a normal end-of-stream. Use Session.SubscribeEvents to block instead of +// dropping. +var ErrEventBufferOverflow = errors.New("mxgateway: event buffer overflow; compatibility stream cancelled and events dropped") + // GatewayError wraps transport-level gRPC failures. type GatewayError struct { // Op names the operation that failed (for example "dial" or "invoke"). diff --git a/clients/go/mxgateway/session.go b/clients/go/mxgateway/session.go index 8e00fd1..4959f78 100644 --- a/clients/go/mxgateway/session.go +++ b/clients/go/mxgateway/session.go @@ -490,7 +490,7 @@ func ensureBulkSize(name string, length int) error { func sendEventResult( ctx context.Context, - results chan<- EventResult, + results chan EventResult, result EventResult, cancelWhenBufferFull bool, cancel context.CancelFunc, @@ -502,7 +502,12 @@ func sendEventResult( case <-ctx.Done(): return false default: + // The bounded compatibility buffer is full. Cancel the stream and + // deliver an explicit terminal overflow error so a slow consumer + // can tell dropped events apart from a normal end-of-stream, + // rather than seeing the channel close silently. cancel() + deliverTerminalResult(results, EventResult{Err: ErrEventBufferOverflow}) return false } } @@ -515,6 +520,25 @@ func sendEventResult( } } +// deliverTerminalResult places result on a full buffered channel by evicting +// one of the oldest buffered events to make room. The caller closes results +// afterwards, so the terminal result becomes the consumer's last item. +func deliverTerminalResult(results chan EventResult, result EventResult) { + for { + select { + case results <- result: + return + default: + } + select { + case <-results: + default: + // Another receiver drained the channel between the send and + // receive attempts; retry the send. + } + } +} + func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) { return s.client.Invoke(ctx, &pb.MxCommandRequest{ SessionId: s.ID(), diff --git a/code-reviews/Client.Go/findings.md b/code-reviews/Client.Go/findings.md index 212a8a3..a158f4e 100644 --- a/code-reviews/Client.Go/findings.md +++ b/code-reviews/Client.Go/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-18 | | Commit reviewed | `3cc53a8` | | Status | Reviewed | -| Open findings | 9 | +| Open findings | 7 | ## Checklist coverage @@ -48,13 +48,13 @@ | Severity | Medium | | Category | Error handling & resilience | | Location | `clients/go/mxgateway/session.go:440-516` | -| Status | Open | +| Status | Resolved | **Description:** For the `Events`/`EventsAfter` compatibility API (`cancelWhenResultBufferFull == true`), when the 16-slot `results` channel is full `sendEventResult` cancels and returns `false`; the goroutine returns and `close(results)` runs — the consumer sees the channel close with **no `EventResult{Err: ...}` ever delivered**. A slow consumer cannot distinguish "stream ended normally" from "events were silently dropped." This contradicts the design doc's "libraries should not reorder, coalesce, or drop events by default", and a test currently pins this lossy behaviour. **Recommendation:** Before cancelling on a full buffer, deliver a terminal `EventResult` carrying an explicit error (e.g. `ErrEventBufferOverflow`). Document the behaviour on `Session.Events`; steer callers to `SubscribeEvents` (which blocks instead of dropping). -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: confirmed against source — on a full bounded buffer the compatibility path cancelled and closed `results` with no terminal result. Added the exported sentinel `ErrEventBufferOverflow` (`errors.go`); `sendEventResult` now, on a full buffer, cancels the stream then calls the new `deliverTerminalResult` helper, which evicts one of the oldest buffered events to make room and places `EventResult{Err: ErrEventBufferOverflow}` so it becomes the consumer's last item before the channel closes. The previously lossy regression test (`TestEventsAfterCancelsStreamWhenCompatibilityChannelIsAbandoned`) was re-pointed to assert the terminal `ErrEventBufferOverflow` result is delivered. `clients/go/README.md` now documents the bounded-buffer/overflow behaviour and steers no-loss callers to `SubscribeEvents`. ### Client.Go-003 @@ -63,13 +63,13 @@ | Severity | Medium | | Category | Correctness & logic bugs | | Location | `clients/go/cmd/mxgw-go/main.go:517-532` | -| Status | Open | +| Status | Resolved | **Description:** `parseInt32List` calls `panic(err)` when an `item-handles` token fails to parse as an int32. The CLI is a documented user-facing tool; a typo like `-item-handles 1,foo` crashes the process with an unrecovered panic and stack trace instead of returning a clean error and exit code 2 like every other validation path in `main.go`. **Recommendation:** Change `parseInt32List` to return `([]int32, error)` and have `runUnsubscribeBulk` propagate the error, matching `parseValue`'s pattern. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: confirmed against source — `parseInt32List` called `panic(err)` on a malformed token. It now returns `([]int32, error)`, wrapping the bad token (`invalid item handle %q: %w`); `runUnsubscribeBulk` parses item handles before dialing and returns the error, so a typo flows through `runWithIO` to `os.Exit(2)` like other validation paths. Regression tests `TestParseInt32ListParsesValidTokens` and `TestParseInt32ListReturnsErrorOnMalformedToken` added to `cmd/mxgw-go/main_test.go`. ### Client.Go-004