package mxgateway import ( "context" "crypto/rand" "encoding/hex" "errors" "io" "sync" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // EventResult carries either the next ordered event or a terminal stream error. type EventResult struct { Event *MxEvent Err error } // Session represents one gateway-backed MXAccess session. type Session struct { client *Client openReply *OpenSessionReply closeMu sync.Mutex closeReply *CloseSessionReply } func newSession(client *Client, openReply *OpenSessionReply) *Session { return &Session{ client: client, openReply: openReply, } } // NewSessionForID creates a session wrapper for commands against an existing // gateway session id. func NewSessionForID(client *Client, sessionID string) *Session { return newSession(client, &pb.OpenSessionReply{SessionId: sessionID}) } // ID returns the gateway session identifier. func (s *Session) ID() string { return s.openReply.GetSessionId() } // OpenReply returns the raw OpenSession reply. func (s *Session) OpenReply() *OpenSessionReply { return s.openReply } // Close closes the gateway session once and returns the raw close reply. func (s *Session) Close(ctx context.Context) (*CloseSessionReply, error) { s.closeMu.Lock() defer s.closeMu.Unlock() if s.closeReply != nil { return s.closeReply, nil } reply, err := s.client.CloseSessionRaw(ctx, &pb.CloseSessionRequest{SessionId: s.ID()}) if err != nil { return reply, err } s.closeReply = reply return reply, nil } // Register invokes MXAccess Register and returns the server handle. func (s *Session) Register(ctx context.Context, clientName string) (int32, error) { reply, err := s.RegisterRaw(ctx, clientName) if err != nil { return 0, err } if reply.GetRegister() != nil { return reply.GetRegister().GetServerHandle(), nil } return reply.GetReturnValue().GetInt32Value(), nil } // RegisterRaw invokes MXAccess Register and returns the raw reply. func (s *Session) RegisterRaw(ctx context.Context, clientName string) (*MxCommandReply, error) { if clientName == "" { return nil, errors.New("mxgateway: client name is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_REGISTER, Payload: &pb.MxCommand_Register{ Register: &pb.RegisterCommand{ClientName: clientName}, }, }) } // Unregister invokes MXAccess Unregister. func (s *Session) Unregister(ctx context.Context, serverHandle int32) error { _, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER, Payload: &pb.MxCommand_Unregister{ Unregister: &pb.UnregisterCommand{ServerHandle: serverHandle}, }, }) return err } // AddItem invokes MXAccess AddItem and returns the item handle. func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) { reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition) if err != nil { return 0, err } if reply.GetAddItem() != nil { return reply.GetAddItem().GetItemHandle(), nil } return reply.GetReturnValue().GetInt32Value(), nil } // AddItemRaw invokes MXAccess AddItem and returns the raw reply. func (s *Session) AddItemRaw(ctx context.Context, serverHandle int32, itemDefinition string) (*MxCommandReply, error) { if itemDefinition == "" { return nil, errors.New("mxgateway: item definition is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM, Payload: &pb.MxCommand_AddItem{ AddItem: &pb.AddItemCommand{ ServerHandle: serverHandle, ItemDefinition: itemDefinition, }, }, }) } // AddItem2 invokes MXAccess AddItem2 and returns the item handle. func (s *Session) AddItem2(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (int32, error) { reply, err := s.AddItem2Raw(ctx, serverHandle, itemDefinition, itemContext) if err != nil { return 0, err } if reply.GetAddItem2() != nil { return reply.GetAddItem2().GetItemHandle(), nil } return reply.GetReturnValue().GetInt32Value(), nil } // AddItem2Raw invokes MXAccess AddItem2 and returns the raw reply. func (s *Session) AddItem2Raw(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (*MxCommandReply, error) { if itemDefinition == "" { return nil, errors.New("mxgateway: item definition is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2, Payload: &pb.MxCommand_AddItem2{ AddItem2: &pb.AddItem2Command{ ServerHandle: serverHandle, ItemDefinition: itemDefinition, ItemContext: itemContext, }, }, }) } // Advise invokes MXAccess Advise. func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error { _, err := s.AdviseRaw(ctx, serverHandle, itemHandle) return err } // AdviseRaw invokes MXAccess Advise and returns the raw reply. func (s *Session) AdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) { return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE, Payload: &pb.MxCommand_Advise{ Advise: &pb.AdviseCommand{ ServerHandle: serverHandle, ItemHandle: itemHandle, }, }, }) } // Write invokes MXAccess Write. func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error { _, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID) return err } // WriteRaw invokes MXAccess Write and returns the raw reply. func (s *Session) WriteRaw(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) (*MxCommandReply, error) { if value == nil { return nil, errors.New("mxgateway: write value is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE, Payload: &pb.MxCommand_Write{ Write: &pb.WriteCommand{ ServerHandle: serverHandle, ItemHandle: itemHandle, Value: value, UserId: userID, }, }, }) } // Events streams ordered session events until the server ends the stream, // context cancellation stops Recv, or a terminal error is sent. func (s *Session) Events(ctx context.Context) (<-chan EventResult, error) { return s.EventsAfter(ctx, 0) } // EventsAfter streams ordered session events after the given worker sequence. func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) (<-chan EventResult, error) { stream, err := s.client.StreamEventsRaw(ctx, &pb.StreamEventsRequest{ SessionId: s.ID(), AfterWorkerSequence: afterWorkerSequence, }) if err != nil { return nil, err } results := make(chan EventResult, 16) go func() { defer close(results) for { event, err := stream.Recv() if err == nil { results <- EventResult{Event: event} continue } if err == io.EOF || status.Code(err) == codes.Canceled || ctx.Err() != nil { return } results <- EventResult{Err: &GatewayError{Op: "stream events", Err: err}} return } }() return results, nil } func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) { return s.client.Invoke(ctx, &pb.MxCommandRequest{ SessionId: s.ID(), ClientCorrelationId: newCorrelationID(), Command: command, }) } func newCorrelationID() string { var buffer [16]byte if _, err := rand.Read(buffer[:]); err != nil { return "" } return hex.EncodeToString(buffer[:]) }