261 lines
7.4 KiB
Go
261 lines
7.4 KiB
Go
package mxgateway
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
|
|
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// EventResult carries either the next ordered event or a terminal stream error.
|
|
type EventResult struct {
|
|
Event *MxEvent
|
|
Err error
|
|
}
|
|
|
|
// Session represents one gateway-backed MXAccess session.
|
|
type Session struct {
|
|
client *Client
|
|
openReply *OpenSessionReply
|
|
closeMu sync.Mutex
|
|
closeReply *CloseSessionReply
|
|
}
|
|
|
|
func newSession(client *Client, openReply *OpenSessionReply) *Session {
|
|
return &Session{
|
|
client: client,
|
|
openReply: openReply,
|
|
}
|
|
}
|
|
|
|
// NewSessionForID creates a session wrapper for commands against an existing
|
|
// gateway session id.
|
|
func NewSessionForID(client *Client, sessionID string) *Session {
|
|
return newSession(client, &pb.OpenSessionReply{SessionId: sessionID})
|
|
}
|
|
|
|
// ID returns the gateway session identifier.
|
|
func (s *Session) ID() string {
|
|
return s.openReply.GetSessionId()
|
|
}
|
|
|
|
// OpenReply returns the raw OpenSession reply.
|
|
func (s *Session) OpenReply() *OpenSessionReply {
|
|
return s.openReply
|
|
}
|
|
|
|
// Close closes the gateway session once and returns the raw close reply.
|
|
func (s *Session) Close(ctx context.Context) (*CloseSessionReply, error) {
|
|
s.closeMu.Lock()
|
|
defer s.closeMu.Unlock()
|
|
|
|
if s.closeReply != nil {
|
|
return s.closeReply, nil
|
|
}
|
|
|
|
reply, err := s.client.CloseSessionRaw(ctx, &pb.CloseSessionRequest{SessionId: s.ID()})
|
|
if err != nil {
|
|
return reply, err
|
|
}
|
|
s.closeReply = reply
|
|
return reply, nil
|
|
}
|
|
|
|
// Register invokes MXAccess Register and returns the server handle.
|
|
func (s *Session) Register(ctx context.Context, clientName string) (int32, error) {
|
|
reply, err := s.RegisterRaw(ctx, clientName)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if reply.GetRegister() != nil {
|
|
return reply.GetRegister().GetServerHandle(), nil
|
|
}
|
|
return reply.GetReturnValue().GetInt32Value(), nil
|
|
}
|
|
|
|
// RegisterRaw invokes MXAccess Register and returns the raw reply.
|
|
func (s *Session) RegisterRaw(ctx context.Context, clientName string) (*MxCommandReply, error) {
|
|
if clientName == "" {
|
|
return nil, errors.New("mxgateway: client name is required")
|
|
}
|
|
|
|
return s.invokeCommand(ctx, &pb.MxCommand{
|
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REGISTER,
|
|
Payload: &pb.MxCommand_Register{
|
|
Register: &pb.RegisterCommand{ClientName: clientName},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Unregister invokes MXAccess Unregister.
|
|
func (s *Session) Unregister(ctx context.Context, serverHandle int32) error {
|
|
_, err := s.invokeCommand(ctx, &pb.MxCommand{
|
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER,
|
|
Payload: &pb.MxCommand_Unregister{
|
|
Unregister: &pb.UnregisterCommand{ServerHandle: serverHandle},
|
|
},
|
|
})
|
|
return err
|
|
}
|
|
|
|
// AddItem invokes MXAccess AddItem and returns the item handle.
|
|
func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) {
|
|
reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if reply.GetAddItem() != nil {
|
|
return reply.GetAddItem().GetItemHandle(), nil
|
|
}
|
|
return reply.GetReturnValue().GetInt32Value(), nil
|
|
}
|
|
|
|
// AddItemRaw invokes MXAccess AddItem and returns the raw reply.
|
|
func (s *Session) AddItemRaw(ctx context.Context, serverHandle int32, itemDefinition string) (*MxCommandReply, error) {
|
|
if itemDefinition == "" {
|
|
return nil, errors.New("mxgateway: item definition is required")
|
|
}
|
|
|
|
return s.invokeCommand(ctx, &pb.MxCommand{
|
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM,
|
|
Payload: &pb.MxCommand_AddItem{
|
|
AddItem: &pb.AddItemCommand{
|
|
ServerHandle: serverHandle,
|
|
ItemDefinition: itemDefinition,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// AddItem2 invokes MXAccess AddItem2 and returns the item handle.
|
|
func (s *Session) AddItem2(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (int32, error) {
|
|
reply, err := s.AddItem2Raw(ctx, serverHandle, itemDefinition, itemContext)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if reply.GetAddItem2() != nil {
|
|
return reply.GetAddItem2().GetItemHandle(), nil
|
|
}
|
|
return reply.GetReturnValue().GetInt32Value(), nil
|
|
}
|
|
|
|
// AddItem2Raw invokes MXAccess AddItem2 and returns the raw reply.
|
|
func (s *Session) AddItem2Raw(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (*MxCommandReply, error) {
|
|
if itemDefinition == "" {
|
|
return nil, errors.New("mxgateway: item definition is required")
|
|
}
|
|
|
|
return s.invokeCommand(ctx, &pb.MxCommand{
|
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2,
|
|
Payload: &pb.MxCommand_AddItem2{
|
|
AddItem2: &pb.AddItem2Command{
|
|
ServerHandle: serverHandle,
|
|
ItemDefinition: itemDefinition,
|
|
ItemContext: itemContext,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Advise invokes MXAccess Advise.
|
|
func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error {
|
|
_, err := s.AdviseRaw(ctx, serverHandle, itemHandle)
|
|
return err
|
|
}
|
|
|
|
// AdviseRaw invokes MXAccess Advise and returns the raw reply.
|
|
func (s *Session) AdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
|
|
return s.invokeCommand(ctx, &pb.MxCommand{
|
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE,
|
|
Payload: &pb.MxCommand_Advise{
|
|
Advise: &pb.AdviseCommand{
|
|
ServerHandle: serverHandle,
|
|
ItemHandle: itemHandle,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Write invokes MXAccess Write.
|
|
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
|
|
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
|
|
return err
|
|
}
|
|
|
|
// WriteRaw invokes MXAccess Write and returns the raw reply.
|
|
func (s *Session) WriteRaw(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) (*MxCommandReply, error) {
|
|
if value == nil {
|
|
return nil, errors.New("mxgateway: write value is required")
|
|
}
|
|
|
|
return s.invokeCommand(ctx, &pb.MxCommand{
|
|
Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE,
|
|
Payload: &pb.MxCommand_Write{
|
|
Write: &pb.WriteCommand{
|
|
ServerHandle: serverHandle,
|
|
ItemHandle: itemHandle,
|
|
Value: value,
|
|
UserId: userID,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Events streams ordered session events until the server ends the stream,
|
|
// context cancellation stops Recv, or a terminal error is sent.
|
|
func (s *Session) Events(ctx context.Context) (<-chan EventResult, error) {
|
|
return s.EventsAfter(ctx, 0)
|
|
}
|
|
|
|
// EventsAfter streams ordered session events after the given worker sequence.
|
|
func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) (<-chan EventResult, error) {
|
|
stream, err := s.client.StreamEventsRaw(ctx, &pb.StreamEventsRequest{
|
|
SessionId: s.ID(),
|
|
AfterWorkerSequence: afterWorkerSequence,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results := make(chan EventResult, 16)
|
|
go func() {
|
|
defer close(results)
|
|
for {
|
|
event, err := stream.Recv()
|
|
if err == nil {
|
|
results <- EventResult{Event: event}
|
|
continue
|
|
}
|
|
if err == io.EOF || status.Code(err) == codes.Canceled || ctx.Err() != nil {
|
|
return
|
|
}
|
|
results <- EventResult{Err: &GatewayError{Op: "stream events", Err: err}}
|
|
return
|
|
}
|
|
}()
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) {
|
|
return s.client.Invoke(ctx, &pb.MxCommandRequest{
|
|
SessionId: s.ID(),
|
|
ClientCorrelationId: newCorrelationID(),
|
|
Command: command,
|
|
})
|
|
}
|
|
|
|
func newCorrelationID() string {
|
|
var buffer [16]byte
|
|
if _, err := rand.Read(buffer[:]); err != nil {
|
|
return ""
|
|
}
|
|
return hex.EncodeToString(buffer[:])
|
|
}
|