Files
mxaccessgw/clients/go/mxgateway/session.go
T
Joseph Doherty 555fe4c0ba Resolve Client.Go-004..010 code-review findings
Client.Go-004: ran gofmt on alarms_test.go and galaxy_test.go; the tree is
now gofmt-clean.

Client.Go-005/009/010: migrated Dial/DialGalaxy off the deprecated
grpc.DialContext/WithBlock to grpc.NewClient via a shared dial helper, with
a DialTimeout-bounded readiness probe to keep fail-fast semantics; shared
callContext deadline arithmetic; updated the stale Dial doc comment. Test
harnesses use passthrough:///bufnet for the NewClient default-scheme change.

Client.Go-006: added GatewayError.Code() and an IsTransient(err) helper so
callers can classify transient gRPC failures.

Client.Go-007: newCorrelationID no longer returns an empty id when
crypto/rand fails — it falls back to a non-empty time+counter id.

Client.Go-008: added coverage_test.go for transport-credential resolution,
callContext deadline arithmetic, and native value/array edge kinds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 22:42:33 -04:00

574 lines
17 KiB
Go

package mxgateway
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const maxBulkItems = 1000
// EventResult carries either the next ordered event or a terminal stream error.
type EventResult struct {
// Event is the next event from the stream when Err is nil.
Event *MxEvent
// Err is the terminal stream error; when non-nil no further results follow.
Err error
}
// EventSubscription owns a running gateway event stream.
type EventSubscription struct {
results <-chan EventResult
cancel context.CancelFunc
done <-chan struct{}
once sync.Once
}
// Events returns the stream results channel.
func (s *EventSubscription) Events() <-chan EventResult {
return s.results
}
// Close cancels the stream and waits for the receive goroutine to stop.
func (s *EventSubscription) Close() {
if s == nil {
return
}
s.once.Do(func() {
s.cancel()
<-s.done
})
}
// Session represents one gateway-backed MXAccess session.
type Session struct {
client *Client
openReply *OpenSessionReply
closeMu sync.Mutex
closeReply *CloseSessionReply
}
func newSession(client *Client, openReply *OpenSessionReply) *Session {
return &Session{
client: client,
openReply: openReply,
}
}
// NewSessionForID creates a session wrapper for commands against an existing
// gateway session id.
func NewSessionForID(client *Client, sessionID string) *Session {
return newSession(client, &pb.OpenSessionReply{SessionId: sessionID})
}
// ID returns the gateway session identifier.
func (s *Session) ID() string {
return s.openReply.GetSessionId()
}
// OpenReply returns the raw OpenSession reply.
func (s *Session) OpenReply() *OpenSessionReply {
return s.openReply
}
// Close closes the gateway session once and returns the raw close reply.
func (s *Session) Close(ctx context.Context) (*CloseSessionReply, error) {
s.closeMu.Lock()
defer s.closeMu.Unlock()
if s.closeReply != nil {
return s.closeReply, nil
}
reply, err := s.client.CloseSessionRaw(ctx, &pb.CloseSessionRequest{SessionId: s.ID()})
if err != nil {
return reply, err
}
s.closeReply = reply
return reply, nil
}
// Register invokes MXAccess Register and returns the server handle.
func (s *Session) Register(ctx context.Context, clientName string) (int32, error) {
reply, err := s.RegisterRaw(ctx, clientName)
if err != nil {
return 0, err
}
if reply.GetRegister() != nil {
return reply.GetRegister().GetServerHandle(), nil
}
return reply.GetReturnValue().GetInt32Value(), nil
}
// RegisterRaw invokes MXAccess Register and returns the raw reply.
func (s *Session) RegisterRaw(ctx context.Context, clientName string) (*MxCommandReply, error) {
if clientName == "" {
return nil, errors.New("mxgateway: client name is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REGISTER,
Payload: &pb.MxCommand_Register{
Register: &pb.RegisterCommand{ClientName: clientName},
},
})
}
// Unregister invokes MXAccess Unregister.
func (s *Session) Unregister(ctx context.Context, serverHandle int32) error {
_, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER,
Payload: &pb.MxCommand_Unregister{
Unregister: &pb.UnregisterCommand{ServerHandle: serverHandle},
},
})
return err
}
// RemoveItem invokes MXAccess RemoveItem.
func (s *Session) RemoveItem(ctx context.Context, serverHandle, itemHandle int32) error {
_, err := s.RemoveItemRaw(ctx, serverHandle, itemHandle)
return err
}
// RemoveItemRaw invokes MXAccess RemoveItem and returns the raw reply.
func (s *Session) RemoveItemRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM,
Payload: &pb.MxCommand_RemoveItem{
RemoveItem: &pb.RemoveItemCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
},
},
})
}
// AddItem invokes MXAccess AddItem and returns the item handle.
func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) {
reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition)
if err != nil {
return 0, err
}
if reply.GetAddItem() != nil {
return reply.GetAddItem().GetItemHandle(), nil
}
return reply.GetReturnValue().GetInt32Value(), nil
}
// AddItemRaw invokes MXAccess AddItem and returns the raw reply.
func (s *Session) AddItemRaw(ctx context.Context, serverHandle int32, itemDefinition string) (*MxCommandReply, error) {
if itemDefinition == "" {
return nil, errors.New("mxgateway: item definition is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM,
Payload: &pb.MxCommand_AddItem{
AddItem: &pb.AddItemCommand{
ServerHandle: serverHandle,
ItemDefinition: itemDefinition,
},
},
})
}
// AddItem2 invokes MXAccess AddItem2 and returns the item handle.
func (s *Session) AddItem2(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (int32, error) {
reply, err := s.AddItem2Raw(ctx, serverHandle, itemDefinition, itemContext)
if err != nil {
return 0, err
}
if reply.GetAddItem2() != nil {
return reply.GetAddItem2().GetItemHandle(), nil
}
return reply.GetReturnValue().GetInt32Value(), nil
}
// AddItem2Raw invokes MXAccess AddItem2 and returns the raw reply.
func (s *Session) AddItem2Raw(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (*MxCommandReply, error) {
if itemDefinition == "" {
return nil, errors.New("mxgateway: item definition is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2,
Payload: &pb.MxCommand_AddItem2{
AddItem2: &pb.AddItem2Command{
ServerHandle: serverHandle,
ItemDefinition: itemDefinition,
ItemContext: itemContext,
},
},
})
}
// Advise invokes MXAccess Advise.
func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error {
_, err := s.AdviseRaw(ctx, serverHandle, itemHandle)
return err
}
// AdviseRaw invokes MXAccess Advise and returns the raw reply.
func (s *Session) AdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE,
Payload: &pb.MxCommand_Advise{
Advise: &pb.AdviseCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
},
},
})
}
// UnAdvise invokes MXAccess UnAdvise.
func (s *Session) UnAdvise(ctx context.Context, serverHandle, itemHandle int32) error {
_, err := s.UnAdviseRaw(ctx, serverHandle, itemHandle)
return err
}
// UnAdviseRaw invokes MXAccess UnAdvise and returns the raw reply.
func (s *Session) UnAdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE,
Payload: &pb.MxCommand_UnAdvise{
UnAdvise: &pb.UnAdviseCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
},
},
})
}
// AddItemBulk invokes MXAccess AddItem for each tag inside one gateway command.
func (s *Session) AddItemBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*SubscribeResult, error) {
if tagAddresses == nil {
return nil, errors.New("mxgateway: tag addresses are required")
}
if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM_BULK,
Payload: &pb.MxCommand_AddItemBulk{
AddItemBulk: &pb.AddItemBulkCommand{
ServerHandle: serverHandle,
TagAddresses: tagAddresses,
},
},
})
if err != nil {
return nil, err
}
return reply.GetAddItemBulk().GetResults(), nil
}
// AdviseItemBulk invokes MXAccess Advise for each item handle inside one gateway command.
func (s *Session) AdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
if err := ensureBulkSize("item handles", len(itemHandles)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE_ITEM_BULK,
Payload: &pb.MxCommand_AdviseItemBulk{
AdviseItemBulk: &pb.AdviseItemBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetAdviseItemBulk().GetResults(), nil
}
// RemoveItemBulk invokes MXAccess RemoveItem for each item handle inside one gateway command.
func (s *Session) RemoveItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
if err := ensureBulkSize("item handles", len(itemHandles)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM_BULK,
Payload: &pb.MxCommand_RemoveItemBulk{
RemoveItemBulk: &pb.RemoveItemBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetRemoveItemBulk().GetResults(), nil
}
// UnAdviseItemBulk invokes MXAccess UnAdvise for each item handle inside one gateway command.
func (s *Session) UnAdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
if err := ensureBulkSize("item handles", len(itemHandles)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK,
Payload: &pb.MxCommand_UnAdviseItemBulk{
UnAdviseItemBulk: &pb.UnAdviseItemBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetUnAdviseItemBulk().GetResults(), nil
}
// SubscribeBulk invokes AddItem and Advise for each tag inside one gateway command.
func (s *Session) SubscribeBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*SubscribeResult, error) {
if tagAddresses == nil {
return nil, errors.New("mxgateway: tag addresses are required")
}
if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK,
Payload: &pb.MxCommand_SubscribeBulk{
SubscribeBulk: &pb.SubscribeBulkCommand{
ServerHandle: serverHandle,
TagAddresses: tagAddresses,
},
},
})
if err != nil {
return nil, err
}
return reply.GetSubscribeBulk().GetResults(), nil
}
// UnsubscribeBulk invokes UnAdvise and RemoveItem for each item handle inside one gateway command.
func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
if err := ensureBulkSize("item handles", len(itemHandles)); err != nil {
return nil, err
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNSUBSCRIBE_BULK,
Payload: &pb.MxCommand_UnsubscribeBulk{
UnsubscribeBulk: &pb.UnsubscribeBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetUnsubscribeBulk().GetResults(), nil
}
// Write invokes MXAccess Write.
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
return err
}
// WriteRaw invokes MXAccess Write and returns the raw reply.
func (s *Session) WriteRaw(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) (*MxCommandReply, error) {
if value == nil {
return nil, errors.New("mxgateway: write value is required")
}
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE,
Payload: &pb.MxCommand_Write{
Write: &pb.WriteCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
Value: value,
UserId: userID,
},
},
})
}
// Events streams ordered session events until the server ends the stream,
// context cancellation stops Recv, or a terminal error is sent.
func (s *Session) Events(ctx context.Context) (<-chan EventResult, error) {
return s.EventsAfter(ctx, 0)
}
// EventsAfter streams ordered session events after the given worker sequence.
func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) (<-chan EventResult, error) {
subscription, err := s.subscribeEventsAfter(ctx, afterWorkerSequence, true)
if err != nil {
return nil, err
}
return subscription.Events(), nil
}
// SubscribeEvents starts an owned event subscription.
func (s *Session) SubscribeEvents(ctx context.Context) (*EventSubscription, error) {
return s.SubscribeEventsAfter(ctx, 0)
}
// SubscribeEventsAfter starts an owned event subscription after the given worker sequence.
func (s *Session) SubscribeEventsAfter(ctx context.Context, afterWorkerSequence uint64) (*EventSubscription, error) {
return s.subscribeEventsAfter(ctx, afterWorkerSequence, false)
}
func (s *Session) subscribeEventsAfter(ctx context.Context, afterWorkerSequence uint64, cancelWhenResultBufferFull bool) (*EventSubscription, error) {
streamCtx, cancel := context.WithCancel(ctx)
stream, err := s.client.StreamEventsRaw(streamCtx, &pb.StreamEventsRequest{
SessionId: s.ID(),
AfterWorkerSequence: afterWorkerSequence,
})
if err != nil {
cancel()
return nil, err
}
results := make(chan EventResult, 16)
done := make(chan struct{})
go func() {
defer close(results)
defer close(done)
for {
event, err := stream.Recv()
if err == nil {
if !sendEventResult(streamCtx, results, EventResult{Event: event}, cancelWhenResultBufferFull, cancel) {
return
}
continue
}
if err == io.EOF || status.Code(err) == codes.Canceled || streamCtx.Err() != nil {
return
}
sendEventResult(
streamCtx,
results,
EventResult{Err: &GatewayError{Op: "stream events", Err: err}},
cancelWhenResultBufferFull,
cancel)
return
}
}()
return &EventSubscription{
results: results,
cancel: cancel,
done: done,
}, nil
}
func ensureBulkSize(name string, length int) error {
if length > maxBulkItems {
return fmt.Errorf("mxgateway: %s bulk commands are limited to %d item(s)", name, maxBulkItems)
}
return nil
}
func sendEventResult(
ctx context.Context,
results chan EventResult,
result EventResult,
cancelWhenBufferFull bool,
cancel context.CancelFunc,
) bool {
if cancelWhenBufferFull {
select {
case results <- result:
return true
case <-ctx.Done():
return false
default:
// The bounded compatibility buffer is full. Cancel the stream and
// deliver an explicit terminal overflow error so a slow consumer
// can tell dropped events apart from a normal end-of-stream,
// rather than seeing the channel close silently.
cancel()
deliverTerminalResult(results, EventResult{Err: ErrEventBufferOverflow})
return false
}
}
select {
case results <- result:
return true
case <-ctx.Done():
return false
}
}
// deliverTerminalResult places result on a full buffered channel by evicting
// one of the oldest buffered events to make room. The caller closes results
// afterwards, so the terminal result becomes the consumer's last item.
func deliverTerminalResult(results chan EventResult, result EventResult) {
for {
select {
case results <- result:
return
default:
}
select {
case <-results:
default:
// Another receiver drained the channel between the send and
// receive attempts; retry the send.
}
}
}
func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) {
return s.client.Invoke(ctx, &pb.MxCommandRequest{
SessionId: s.ID(),
ClientCorrelationId: newCorrelationID(),
Command: command,
})
}
// correlationIDCounter backs the deterministic fallback id used when
// crypto/rand is unavailable, so every command still carries a unique,
// traceable correlation id.
var correlationIDCounter atomic.Uint64
// randRead is the entropy source for newCorrelationID. It is a package
// variable solely so tests can simulate a crypto/rand failure.
var randRead = rand.Read
// newCorrelationID returns a unique correlation id for an MxCommandRequest.
// It prefers 16 bytes of crypto/rand entropy; if rand.Read fails (rare) it
// falls back to a "fallback-" prefixed id built from the current time and a
// process-wide monotonic counter rather than returning an empty string, which
// would leave the command untraceable in gateway logs.
func newCorrelationID() string {
var buffer [16]byte
if _, err := randRead(buffer[:]); err != nil {
return fmt.Sprintf("fallback-%x-%x",
time.Now().UnixNano(), correlationIDCounter.Add(1))
}
return hex.EncodeToString(buffer[:])
}