Resolve Client.Go-022..027: bulk flags, bench cancel, batch loop
Client.Go-022 Re-applied Client.Go-015 shape — runWriteBulkVariant drops
the unused secured param and gates -current-user-id /
-verifier-user-id / -user-id behind the secured-only
variants.
Client.Go-023 Re-applied Client.Go-018 shape — bench warm-up and steady-
state loops respect ctx.Err().
Client.Go-024 Added SDK-level tests for WriteBulk / Write2Bulk /
WriteSecuredBulk / WriteSecured2Bulk / ReadBulk and
StreamAlarms via the existing bufconn fake gateway pattern.
Client.Go-025 Five bulk SDK methods short-circuit on empty input without
an RPC round-trip and document the behavior.
Client.Go-026 runBatch widens scanner.Buffer to 16 MiB and emits an
error-with-sentinel if a longer line still arrives, rather
than aborting the session silently.
Client.Go-027 runBatch treats blank lines as skip-and-continue; only EOF
ends the session.
All resolved at 2026-05-24; gofmt + go vet + go build + go test ./... all
green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -168,6 +168,66 @@ func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamAlarmsPassesFilterPrefixAndReceivesFeedMessages(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{
|
||||
feedMessages: []*pb.AlarmFeedMessage{
|
||||
{
|
||||
Payload: &pb.AlarmFeedMessage_ActiveAlarm{
|
||||
ActiveAlarm: &pb.ActiveAlarmSnapshot{
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
CurrentState: pb.AlarmConditionState_ALARM_CONDITION_STATE_ACTIVE,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Payload: &pb.AlarmFeedMessage_SnapshotComplete{
|
||||
SnapshotComplete: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
stream, err := client.StreamAlarms(context.Background(), &pb.StreamAlarmsRequest{
|
||||
AlarmFilterPrefix: "Tank01.",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("StreamAlarms() error = %v", err)
|
||||
}
|
||||
|
||||
var received []*pb.AlarmFeedMessage
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("stream.Recv() error = %v", err)
|
||||
}
|
||||
received = append(received, msg)
|
||||
}
|
||||
if len(received) != 2 {
|
||||
t.Fatalf("received count = %d, want 2", len(received))
|
||||
}
|
||||
if got := fake.streamRequest.GetAlarmFilterPrefix(); got != "Tank01." {
|
||||
t.Fatalf("captured filter prefix = %q", got)
|
||||
}
|
||||
if got := fake.streamAuth; got != "Bearer test-api-key" {
|
||||
t.Fatalf("stream authorization metadata = %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamAlarmsRejectsNilRequest(t *testing.T) {
|
||||
fake := &fakeGatewayWithAlarms{}
|
||||
client, cleanup := newBufconnClientWithAlarms(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
if _, err := client.StreamAlarms(context.Background(), nil); err == nil {
|
||||
t.Fatal("StreamAlarms(nil) returned no error")
|
||||
}
|
||||
}
|
||||
|
||||
type fakeGatewayWithAlarms struct {
|
||||
pb.UnimplementedMxAccessGatewayServer
|
||||
|
||||
@@ -178,6 +238,10 @@ type fakeGatewayWithAlarms struct {
|
||||
|
||||
queryRequest *pb.QueryActiveAlarmsRequest
|
||||
activeSnapshots []*pb.ActiveAlarmSnapshot
|
||||
|
||||
streamRequest *pb.StreamAlarmsRequest
|
||||
feedMessages []*pb.AlarmFeedMessage
|
||||
streamAuth string
|
||||
}
|
||||
|
||||
func (s *fakeGatewayWithAlarms) AcknowledgeAlarm(ctx context.Context, req *pb.AcknowledgeAlarmRequest) (*pb.AcknowledgeAlarmReply, error) {
|
||||
@@ -207,6 +271,17 @@ func (s *fakeGatewayWithAlarms) QueryActiveAlarms(req *pb.QueryActiveAlarmsReque
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *fakeGatewayWithAlarms) StreamAlarms(req *pb.StreamAlarmsRequest, stream grpc.ServerStreamingServer[pb.AlarmFeedMessage]) error {
|
||||
s.streamRequest = req
|
||||
s.streamAuth = authorizationFromContext(stream.Context())
|
||||
for _, msg := range s.feedMessages {
|
||||
if err := stream.Send(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Client, func()) {
|
||||
t.Helper()
|
||||
listener := bufconn.Listen(bufSize)
|
||||
|
||||
@@ -230,6 +230,206 @@ func TestSubscribeBulkBuildsOneBulkCommandAndReturnsResults(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteBulkBuildsOneBulkCommandAndReturnsPerEntryResults(t *testing.T) {
|
||||
fake := &fakeGatewayServer{
|
||||
invokeReply: &pb.MxCommandReply{
|
||||
SessionId: "session-1",
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK,
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
Payload: &pb.MxCommandReply_WriteBulk{
|
||||
WriteBulk: &pb.BulkWriteReply{
|
||||
Results: []*pb.BulkWriteResult{
|
||||
{ItemHandle: 10, WasSuccessful: true},
|
||||
{ItemHandle: 11, WasSuccessful: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClient(t, fake)
|
||||
defer cleanup()
|
||||
session := NewSessionForID(client, "session-1")
|
||||
|
||||
entries := []*WriteBulkEntry{
|
||||
{ItemHandle: 10, Value: Int32Value(7), UserId: 100},
|
||||
{ItemHandle: 11, Value: Int32Value(8), UserId: 100},
|
||||
}
|
||||
results, err := session.WriteBulk(context.Background(), 12, entries)
|
||||
if err != nil {
|
||||
t.Fatalf("WriteBulk() error = %v", err)
|
||||
}
|
||||
if len(results) != 2 {
|
||||
t.Fatalf("results len = %d, want 2", len(results))
|
||||
}
|
||||
req := fake.invokeRequest
|
||||
if req.GetCommand().GetKind() != pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK {
|
||||
t.Fatalf("command kind = %s", req.GetCommand().GetKind())
|
||||
}
|
||||
if got := req.GetCommand().GetWriteBulk().GetEntries(); len(got) != 2 {
|
||||
t.Fatalf("entry count = %d, want 2", len(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteBulkRejectsNilEntries(t *testing.T) {
|
||||
fake := &fakeGatewayServer{}
|
||||
client, cleanup := newBufconnClient(t, fake)
|
||||
defer cleanup()
|
||||
session := NewSessionForID(client, "session-1")
|
||||
|
||||
if _, err := session.WriteBulk(context.Background(), 12, nil); err == nil {
|
||||
t.Fatal("WriteBulk(nil) returned no error")
|
||||
}
|
||||
if _, err := session.Write2Bulk(context.Background(), 12, nil); err == nil {
|
||||
t.Fatal("Write2Bulk(nil) returned no error")
|
||||
}
|
||||
if _, err := session.WriteSecuredBulk(context.Background(), 12, nil); err == nil {
|
||||
t.Fatal("WriteSecuredBulk(nil) returned no error")
|
||||
}
|
||||
if _, err := session.WriteSecured2Bulk(context.Background(), 12, nil); err == nil {
|
||||
t.Fatal("WriteSecured2Bulk(nil) returned no error")
|
||||
}
|
||||
if _, err := session.ReadBulk(context.Background(), 12, nil, 0); err == nil {
|
||||
t.Fatal("ReadBulk(nil) returned no error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBulkMethodsShortCircuitOnEmptySliceWithoutRoundTrip(t *testing.T) {
|
||||
fake := &fakeGatewayServer{
|
||||
invokeReply: &pb.MxCommandReply{
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClient(t, fake)
|
||||
defer cleanup()
|
||||
session := NewSessionForID(client, "session-1")
|
||||
|
||||
results, err := session.WriteBulk(context.Background(), 12, []*WriteBulkEntry{})
|
||||
if err != nil {
|
||||
t.Fatalf("WriteBulk(empty) error = %v", err)
|
||||
}
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("WriteBulk(empty) results len = %d, want 0", len(results))
|
||||
}
|
||||
if fake.invokeRequest != nil {
|
||||
t.Fatal("WriteBulk(empty) sent a round trip; expected short-circuit")
|
||||
}
|
||||
|
||||
results2, err := session.Write2Bulk(context.Background(), 12, []*Write2BulkEntry{})
|
||||
if err != nil {
|
||||
t.Fatalf("Write2Bulk(empty) error = %v", err)
|
||||
}
|
||||
if len(results2) != 0 {
|
||||
t.Fatalf("Write2Bulk(empty) results len = %d, want 0", len(results2))
|
||||
}
|
||||
if fake.invokeRequest != nil {
|
||||
t.Fatal("Write2Bulk(empty) sent a round trip; expected short-circuit")
|
||||
}
|
||||
|
||||
results3, err := session.WriteSecuredBulk(context.Background(), 12, []*WriteSecuredBulkEntry{})
|
||||
if err != nil {
|
||||
t.Fatalf("WriteSecuredBulk(empty) error = %v", err)
|
||||
}
|
||||
if len(results3) != 0 {
|
||||
t.Fatalf("WriteSecuredBulk(empty) results len = %d, want 0", len(results3))
|
||||
}
|
||||
if fake.invokeRequest != nil {
|
||||
t.Fatal("WriteSecuredBulk(empty) sent a round trip; expected short-circuit")
|
||||
}
|
||||
|
||||
results4, err := session.WriteSecured2Bulk(context.Background(), 12, []*WriteSecured2BulkEntry{})
|
||||
if err != nil {
|
||||
t.Fatalf("WriteSecured2Bulk(empty) error = %v", err)
|
||||
}
|
||||
if len(results4) != 0 {
|
||||
t.Fatalf("WriteSecured2Bulk(empty) results len = %d, want 0", len(results4))
|
||||
}
|
||||
if fake.invokeRequest != nil {
|
||||
t.Fatal("WriteSecured2Bulk(empty) sent a round trip; expected short-circuit")
|
||||
}
|
||||
|
||||
readResults, err := session.ReadBulk(context.Background(), 12, []string{}, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadBulk(empty) error = %v", err)
|
||||
}
|
||||
if len(readResults) != 0 {
|
||||
t.Fatalf("ReadBulk(empty) results len = %d, want 0", len(readResults))
|
||||
}
|
||||
if fake.invokeRequest != nil {
|
||||
t.Fatal("ReadBulk(empty) sent a round trip; expected short-circuit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadBulkForwardsTimeoutAndUnpacksCachedFlag(t *testing.T) {
|
||||
fake := &fakeGatewayServer{
|
||||
invokeReply: &pb.MxCommandReply{
|
||||
SessionId: "session-1",
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK,
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
Payload: &pb.MxCommandReply_ReadBulk{
|
||||
ReadBulk: &pb.BulkReadReply{
|
||||
Results: []*pb.BulkReadResult{
|
||||
{TagAddress: "Tank01.Level", WasSuccessful: true, WasCached: true},
|
||||
{TagAddress: "Tank02.Level", WasSuccessful: true, WasCached: false},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClient(t, fake)
|
||||
defer cleanup()
|
||||
session := NewSessionForID(client, "session-1")
|
||||
|
||||
results, err := session.ReadBulk(context.Background(), 12, []string{"Tank01.Level", "Tank02.Level"}, 250*time.Millisecond)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadBulk() error = %v", err)
|
||||
}
|
||||
if len(results) != 2 {
|
||||
t.Fatalf("results len = %d, want 2", len(results))
|
||||
}
|
||||
if !results[0].GetWasCached() || results[1].GetWasCached() {
|
||||
t.Fatalf("WasCached flags = [%v %v], want [true false]", results[0].GetWasCached(), results[1].GetWasCached())
|
||||
}
|
||||
req := fake.invokeRequest
|
||||
if req.GetCommand().GetKind() != pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK {
|
||||
t.Fatalf("command kind = %s", req.GetCommand().GetKind())
|
||||
}
|
||||
if got := req.GetCommand().GetReadBulk().GetTimeoutMs(); got != 250 {
|
||||
t.Fatalf("timeout ms = %d, want 250", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadBulkSaturatesTimeoutAboveMaxUint32(t *testing.T) {
|
||||
fake := &fakeGatewayServer{
|
||||
invokeReply: &pb.MxCommandReply{
|
||||
SessionId: "session-1",
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK,
|
||||
ProtocolStatus: &pb.ProtocolStatus{
|
||||
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
|
||||
},
|
||||
},
|
||||
}
|
||||
client, cleanup := newBufconnClient(t, fake)
|
||||
defer cleanup()
|
||||
session := NewSessionForID(client, "session-1")
|
||||
|
||||
// 100 days in milliseconds exceeds MaxUint32 (~49.7 days).
|
||||
hugeTimeout := 100 * 24 * time.Hour
|
||||
_, err := session.ReadBulk(context.Background(), 12, []string{"Tank01.Level"}, hugeTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadBulk() error = %v", err)
|
||||
}
|
||||
got := fake.invokeRequest.GetCommand().GetReadBulk().GetTimeoutMs()
|
||||
if got != ^uint32(0) {
|
||||
t.Fatalf("timeout ms = %d, want %d (MaxUint32)", got, ^uint32(0))
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvokeReturnsTypedMxAccessErrorWithRawReply(t *testing.T) {
|
||||
hresult := int32(-2147467259)
|
||||
fake := &fakeGatewayServer{
|
||||
|
||||
@@ -392,6 +392,9 @@ func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemH
|
||||
// Per-entry failures appear as BulkWriteResult entries with WasSuccessful=false; the call
|
||||
// never returns an error for per-entry MXAccess failures (it returns an error only for
|
||||
// protocol-level failures or transport errors).
|
||||
//
|
||||
// A non-nil but empty entries slice is treated as a no-op and returns an empty result
|
||||
// without a wire round-trip; pass nil to surface a clear "entries are required" error.
|
||||
func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*WriteBulkEntry) ([]*BulkWriteResult, error) {
|
||||
if entries == nil {
|
||||
return nil, errors.New("mxgateway: write bulk entries are required")
|
||||
@@ -399,6 +402,9 @@ func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*
|
||||
if err := ensureBulkSize("write bulk entries", len(entries)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return []*BulkWriteResult{}, nil
|
||||
}
|
||||
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK,
|
||||
Payload: &pb.MxCommand_WriteBulk{
|
||||
@@ -415,6 +421,9 @@ func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*
|
||||
}
|
||||
|
||||
// Write2Bulk invokes MXAccess Write2 (timestamped) for each entry inside one gateway command.
|
||||
//
|
||||
// A non-nil but empty entries slice is treated as a no-op and returns an empty result
|
||||
// without a wire round-trip; pass nil to surface a clear "entries are required" error.
|
||||
func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []*Write2BulkEntry) ([]*BulkWriteResult, error) {
|
||||
if entries == nil {
|
||||
return nil, errors.New("mxgateway: write2 bulk entries are required")
|
||||
@@ -422,6 +431,9 @@ func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []
|
||||
if err := ensureBulkSize("write2 bulk entries", len(entries)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return []*BulkWriteResult{}, nil
|
||||
}
|
||||
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE2_BULK,
|
||||
Payload: &pb.MxCommand_Write2Bulk{
|
||||
@@ -439,6 +451,9 @@ func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []
|
||||
|
||||
// WriteSecuredBulk invokes MXAccess WriteSecured for each entry. Credential-sensitive
|
||||
// values must not be logged by callers; mirrors the single-item WriteSecured contract.
|
||||
//
|
||||
// A non-nil but empty entries slice is treated as a no-op and returns an empty result
|
||||
// without a wire round-trip; pass nil to surface a clear "entries are required" error.
|
||||
func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entries []*WriteSecuredBulkEntry) ([]*BulkWriteResult, error) {
|
||||
if entries == nil {
|
||||
return nil, errors.New("mxgateway: write-secured bulk entries are required")
|
||||
@@ -446,6 +461,9 @@ func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entr
|
||||
if err := ensureBulkSize("write-secured bulk entries", len(entries)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return []*BulkWriteResult{}, nil
|
||||
}
|
||||
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED_BULK,
|
||||
Payload: &pb.MxCommand_WriteSecuredBulk{
|
||||
@@ -462,6 +480,9 @@ func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entr
|
||||
}
|
||||
|
||||
// WriteSecured2Bulk invokes MXAccess WriteSecured2 (timestamped) for each entry.
|
||||
//
|
||||
// A non-nil but empty entries slice is treated as a no-op and returns an empty result
|
||||
// without a wire round-trip; pass nil to surface a clear "entries are required" error.
|
||||
func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, entries []*WriteSecured2BulkEntry) ([]*BulkWriteResult, error) {
|
||||
if entries == nil {
|
||||
return nil, errors.New("mxgateway: write-secured2 bulk entries are required")
|
||||
@@ -469,6 +490,9 @@ func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, ent
|
||||
if err := ensureBulkSize("write-secured2 bulk entries", len(entries)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return []*BulkWriteResult{}, nil
|
||||
}
|
||||
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
|
||||
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED2_BULK,
|
||||
Payload: &pb.MxCommand_WriteSecured2Bulk{
|
||||
@@ -492,6 +516,10 @@ func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, ent
|
||||
// otherwise. timeout bounds the wait per tag in the snapshot case; pass zero to use the
|
||||
// worker default. Per-tag failures (timeout, invalid tag) appear as BulkReadResult entries
|
||||
// with WasSuccessful=false; the call never returns an error for per-tag MXAccess failures.
|
||||
//
|
||||
// A non-nil but empty tagAddresses slice is treated as a no-op and returns an empty
|
||||
// result without a wire round-trip; pass nil to surface a clear "tag addresses are
|
||||
// required" error.
|
||||
func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses []string, timeout time.Duration) ([]*BulkReadResult, error) {
|
||||
if tagAddresses == nil {
|
||||
return nil, errors.New("mxgateway: tag addresses are required")
|
||||
@@ -499,6 +527,9 @@ func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses
|
||||
if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(tagAddresses) == 0 {
|
||||
return []*BulkReadResult{}, nil
|
||||
}
|
||||
var timeoutMs uint32
|
||||
if timeout > 0 {
|
||||
ms := timeout.Milliseconds()
|
||||
|
||||
Reference in New Issue
Block a user