package mxgateway import ( "context" "crypto/rand" "encoding/hex" "errors" "fmt" "io" "sync" "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const maxBulkItems = 1000 // EventResult carries either the next ordered event or a terminal stream error. type EventResult struct { // Event is the next event from the stream when Err is nil. Event *MxEvent // Err is the terminal stream error; when non-nil no further results follow. Err error } // EventSubscription owns a running gateway event stream. type EventSubscription struct { results <-chan EventResult cancel context.CancelFunc done <-chan struct{} once sync.Once } // Events returns the stream results channel. func (s *EventSubscription) Events() <-chan EventResult { return s.results } // Close cancels the stream and waits for the receive goroutine to stop. func (s *EventSubscription) Close() { if s == nil { return } s.once.Do(func() { s.cancel() <-s.done }) } // Session represents one gateway-backed MXAccess session. type Session struct { client *Client openReply *OpenSessionReply closeMu sync.Mutex closeReply *CloseSessionReply } func newSession(client *Client, openReply *OpenSessionReply) *Session { return &Session{ client: client, openReply: openReply, } } // NewSessionForID creates a session wrapper for commands against an existing // gateway session id. func NewSessionForID(client *Client, sessionID string) *Session { return newSession(client, &pb.OpenSessionReply{SessionId: sessionID}) } // ID returns the gateway session identifier. func (s *Session) ID() string { return s.openReply.GetSessionId() } // OpenReply returns the raw OpenSession reply. func (s *Session) OpenReply() *OpenSessionReply { return s.openReply } // Close closes the gateway session once and returns the raw close reply. func (s *Session) Close(ctx context.Context) (*CloseSessionReply, error) { s.closeMu.Lock() defer s.closeMu.Unlock() if s.closeReply != nil { return s.closeReply, nil } reply, err := s.client.CloseSessionRaw(ctx, &pb.CloseSessionRequest{SessionId: s.ID()}) if err != nil { return reply, err } s.closeReply = reply return reply, nil } // Register invokes MXAccess Register and returns the server handle. func (s *Session) Register(ctx context.Context, clientName string) (int32, error) { reply, err := s.RegisterRaw(ctx, clientName) if err != nil { return 0, err } if reply.GetRegister() != nil { return reply.GetRegister().GetServerHandle(), nil } return reply.GetReturnValue().GetInt32Value(), nil } // RegisterRaw invokes MXAccess Register and returns the raw reply. func (s *Session) RegisterRaw(ctx context.Context, clientName string) (*MxCommandReply, error) { if clientName == "" { return nil, errors.New("mxgateway: client name is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_REGISTER, Payload: &pb.MxCommand_Register{ Register: &pb.RegisterCommand{ClientName: clientName}, }, }) } // Unregister invokes MXAccess Unregister. func (s *Session) Unregister(ctx context.Context, serverHandle int32) error { _, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER, Payload: &pb.MxCommand_Unregister{ Unregister: &pb.UnregisterCommand{ServerHandle: serverHandle}, }, }) return err } // RemoveItem invokes MXAccess RemoveItem. func (s *Session) RemoveItem(ctx context.Context, serverHandle, itemHandle int32) error { _, err := s.RemoveItemRaw(ctx, serverHandle, itemHandle) return err } // RemoveItemRaw invokes MXAccess RemoveItem and returns the raw reply. func (s *Session) RemoveItemRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) { return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM, Payload: &pb.MxCommand_RemoveItem{ RemoveItem: &pb.RemoveItemCommand{ ServerHandle: serverHandle, ItemHandle: itemHandle, }, }, }) } // AddItem invokes MXAccess AddItem and returns the item handle. func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) { reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition) if err != nil { return 0, err } if reply.GetAddItem() != nil { return reply.GetAddItem().GetItemHandle(), nil } return reply.GetReturnValue().GetInt32Value(), nil } // AddItemRaw invokes MXAccess AddItem and returns the raw reply. func (s *Session) AddItemRaw(ctx context.Context, serverHandle int32, itemDefinition string) (*MxCommandReply, error) { if itemDefinition == "" { return nil, errors.New("mxgateway: item definition is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM, Payload: &pb.MxCommand_AddItem{ AddItem: &pb.AddItemCommand{ ServerHandle: serverHandle, ItemDefinition: itemDefinition, }, }, }) } // AddItem2 invokes MXAccess AddItem2 and returns the item handle. func (s *Session) AddItem2(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (int32, error) { reply, err := s.AddItem2Raw(ctx, serverHandle, itemDefinition, itemContext) if err != nil { return 0, err } if reply.GetAddItem2() != nil { return reply.GetAddItem2().GetItemHandle(), nil } return reply.GetReturnValue().GetInt32Value(), nil } // AddItem2Raw invokes MXAccess AddItem2 and returns the raw reply. func (s *Session) AddItem2Raw(ctx context.Context, serverHandle int32, itemDefinition, itemContext string) (*MxCommandReply, error) { if itemDefinition == "" { return nil, errors.New("mxgateway: item definition is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2, Payload: &pb.MxCommand_AddItem2{ AddItem2: &pb.AddItem2Command{ ServerHandle: serverHandle, ItemDefinition: itemDefinition, ItemContext: itemContext, }, }, }) } // Advise invokes MXAccess Advise. func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error { _, err := s.AdviseRaw(ctx, serverHandle, itemHandle) return err } // AdviseRaw invokes MXAccess Advise and returns the raw reply. func (s *Session) AdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) { return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE, Payload: &pb.MxCommand_Advise{ Advise: &pb.AdviseCommand{ ServerHandle: serverHandle, ItemHandle: itemHandle, }, }, }) } // UnAdvise invokes MXAccess UnAdvise. func (s *Session) UnAdvise(ctx context.Context, serverHandle, itemHandle int32) error { _, err := s.UnAdviseRaw(ctx, serverHandle, itemHandle) return err } // UnAdviseRaw invokes MXAccess UnAdvise and returns the raw reply. func (s *Session) UnAdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) { return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE, Payload: &pb.MxCommand_UnAdvise{ UnAdvise: &pb.UnAdviseCommand{ ServerHandle: serverHandle, ItemHandle: itemHandle, }, }, }) } // AddItemBulk invokes MXAccess AddItem for each tag inside one gateway command. func (s *Session) AddItemBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*SubscribeResult, error) { if tagAddresses == nil { return nil, errors.New("mxgateway: tag addresses are required") } if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { return nil, err } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM_BULK, Payload: &pb.MxCommand_AddItemBulk{ AddItemBulk: &pb.AddItemBulkCommand{ ServerHandle: serverHandle, TagAddresses: tagAddresses, }, }, }) if err != nil { return nil, err } return reply.GetAddItemBulk().GetResults(), nil } // AdviseItemBulk invokes MXAccess Advise for each item handle inside one gateway command. func (s *Session) AdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) { if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { return nil, err } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE_ITEM_BULK, Payload: &pb.MxCommand_AdviseItemBulk{ AdviseItemBulk: &pb.AdviseItemBulkCommand{ ServerHandle: serverHandle, ItemHandles: itemHandles, }, }, }) if err != nil { return nil, err } return reply.GetAdviseItemBulk().GetResults(), nil } // RemoveItemBulk invokes MXAccess RemoveItem for each item handle inside one gateway command. func (s *Session) RemoveItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) { if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { return nil, err } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM_BULK, Payload: &pb.MxCommand_RemoveItemBulk{ RemoveItemBulk: &pb.RemoveItemBulkCommand{ ServerHandle: serverHandle, ItemHandles: itemHandles, }, }, }) if err != nil { return nil, err } return reply.GetRemoveItemBulk().GetResults(), nil } // UnAdviseItemBulk invokes MXAccess UnAdvise for each item handle inside one gateway command. func (s *Session) UnAdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) { if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { return nil, err } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK, Payload: &pb.MxCommand_UnAdviseItemBulk{ UnAdviseItemBulk: &pb.UnAdviseItemBulkCommand{ ServerHandle: serverHandle, ItemHandles: itemHandles, }, }, }) if err != nil { return nil, err } return reply.GetUnAdviseItemBulk().GetResults(), nil } // SubscribeBulk invokes AddItem and Advise for each tag inside one gateway command. func (s *Session) SubscribeBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*SubscribeResult, error) { if tagAddresses == nil { return nil, errors.New("mxgateway: tag addresses are required") } if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { return nil, err } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK, Payload: &pb.MxCommand_SubscribeBulk{ SubscribeBulk: &pb.SubscribeBulkCommand{ ServerHandle: serverHandle, TagAddresses: tagAddresses, }, }, }) if err != nil { return nil, err } return reply.GetSubscribeBulk().GetResults(), nil } // UnsubscribeBulk invokes UnAdvise and RemoveItem for each item handle inside one gateway command. func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) { if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { return nil, err } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNSUBSCRIBE_BULK, Payload: &pb.MxCommand_UnsubscribeBulk{ UnsubscribeBulk: &pb.UnsubscribeBulkCommand{ ServerHandle: serverHandle, ItemHandles: itemHandles, }, }, }) if err != nil { return nil, err } return reply.GetUnsubscribeBulk().GetResults(), nil } // WriteBulk invokes MXAccess Write sequentially for each entry inside one gateway command. // Per-entry failures appear as BulkWriteResult entries with WasSuccessful=false; the call // never returns an error for per-entry MXAccess failures (it returns an error only for // protocol-level failures or transport errors). // // A non-nil but empty entries slice is treated as a no-op and returns an empty result // without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) WriteBulk(ctx context.Context, serverHandle int32, entries []*WriteBulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write bulk entries are required") } if err := ensureBulkSize("write bulk entries", len(entries)); err != nil { return nil, err } if len(entries) == 0 { return []*BulkWriteResult{}, nil } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_BULK, Payload: &pb.MxCommand_WriteBulk{ WriteBulk: &pb.WriteBulkCommand{ ServerHandle: serverHandle, Entries: entries, }, }, }) if err != nil { return nil, err } return reply.GetWriteBulk().GetResults(), nil } // Write2Bulk invokes MXAccess Write2 (timestamped) for each entry inside one gateway command. // // A non-nil but empty entries slice is treated as a no-op and returns an empty result // without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) Write2Bulk(ctx context.Context, serverHandle int32, entries []*Write2BulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write2 bulk entries are required") } if err := ensureBulkSize("write2 bulk entries", len(entries)); err != nil { return nil, err } if len(entries) == 0 { return []*BulkWriteResult{}, nil } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE2_BULK, Payload: &pb.MxCommand_Write2Bulk{ Write2Bulk: &pb.Write2BulkCommand{ ServerHandle: serverHandle, Entries: entries, }, }, }) if err != nil { return nil, err } return reply.GetWrite2Bulk().GetResults(), nil } // WriteSecuredBulk invokes MXAccess WriteSecured for each entry. Credential-sensitive // values must not be logged by callers; mirrors the single-item WriteSecured contract. // // A non-nil but empty entries slice is treated as a no-op and returns an empty result // without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) WriteSecuredBulk(ctx context.Context, serverHandle int32, entries []*WriteSecuredBulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write-secured bulk entries are required") } if err := ensureBulkSize("write-secured bulk entries", len(entries)); err != nil { return nil, err } if len(entries) == 0 { return []*BulkWriteResult{}, nil } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED_BULK, Payload: &pb.MxCommand_WriteSecuredBulk{ WriteSecuredBulk: &pb.WriteSecuredBulkCommand{ ServerHandle: serverHandle, Entries: entries, }, }, }) if err != nil { return nil, err } return reply.GetWriteSecuredBulk().GetResults(), nil } // WriteSecured2Bulk invokes MXAccess WriteSecured2 (timestamped) for each entry. // // A non-nil but empty entries slice is treated as a no-op and returns an empty result // without a wire round-trip; pass nil to surface a clear "entries are required" error. func (s *Session) WriteSecured2Bulk(ctx context.Context, serverHandle int32, entries []*WriteSecured2BulkEntry) ([]*BulkWriteResult, error) { if entries == nil { return nil, errors.New("mxgateway: write-secured2 bulk entries are required") } if err := ensureBulkSize("write-secured2 bulk entries", len(entries)); err != nil { return nil, err } if len(entries) == 0 { return []*BulkWriteResult{}, nil } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE_SECURED2_BULK, Payload: &pb.MxCommand_WriteSecured2Bulk{ WriteSecured2Bulk: &pb.WriteSecured2BulkCommand{ ServerHandle: serverHandle, Entries: entries, }, }, }) if err != nil { return nil, err } return reply.GetWriteSecured2Bulk().GetResults(), nil } // ReadBulk snapshots the current value of each requested tag. // // MXAccess COM has no synchronous Read; the worker satisfies this by returning the // most recent cached OnDataChange value when the tag is already advised (WasCached=true), // or by taking a full AddItem + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle // otherwise. timeout bounds the wait per tag in the snapshot case; pass zero to use the // worker default. Per-tag failures (timeout, invalid tag) appear as BulkReadResult entries // with WasSuccessful=false; the call never returns an error for per-tag MXAccess failures. // // A non-nil but empty tagAddresses slice is treated as a no-op and returns an empty // result without a wire round-trip; pass nil to surface a clear "tag addresses are // required" error. func (s *Session) ReadBulk(ctx context.Context, serverHandle int32, tagAddresses []string, timeout time.Duration) ([]*BulkReadResult, error) { if tagAddresses == nil { return nil, errors.New("mxgateway: tag addresses are required") } if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { return nil, err } if len(tagAddresses) == 0 { return []*BulkReadResult{}, nil } var timeoutMs uint32 if timeout > 0 { ms := timeout.Milliseconds() if ms > int64(^uint32(0)) { timeoutMs = ^uint32(0) } else { timeoutMs = uint32(ms) } } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_READ_BULK, Payload: &pb.MxCommand_ReadBulk{ ReadBulk: &pb.ReadBulkCommand{ ServerHandle: serverHandle, TagAddresses: tagAddresses, TimeoutMs: timeoutMs, }, }, }) if err != nil { return nil, err } return reply.GetReadBulk().GetResults(), nil } // Write invokes MXAccess Write. func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error { _, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID) return err } // WriteRaw invokes MXAccess Write and returns the raw reply. func (s *Session) WriteRaw(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) (*MxCommandReply, error) { if value == nil { return nil, errors.New("mxgateway: write value is required") } return s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_WRITE, Payload: &pb.MxCommand_Write{ Write: &pb.WriteCommand{ ServerHandle: serverHandle, ItemHandle: itemHandle, Value: value, UserId: userID, }, }, }) } // Events streams ordered session events until the server ends the stream, // context cancellation stops Recv, or a terminal error is sent. func (s *Session) Events(ctx context.Context) (<-chan EventResult, error) { return s.EventsAfter(ctx, 0) } // EventsAfter streams ordered session events after the given worker sequence. func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) (<-chan EventResult, error) { subscription, err := s.subscribeEventsAfter(ctx, afterWorkerSequence, true) if err != nil { return nil, err } return subscription.Events(), nil } // SubscribeEvents starts an owned event subscription. func (s *Session) SubscribeEvents(ctx context.Context) (*EventSubscription, error) { return s.SubscribeEventsAfter(ctx, 0) } // SubscribeEventsAfter starts an owned event subscription after the given worker sequence. func (s *Session) SubscribeEventsAfter(ctx context.Context, afterWorkerSequence uint64) (*EventSubscription, error) { return s.subscribeEventsAfter(ctx, afterWorkerSequence, false) } func (s *Session) subscribeEventsAfter(ctx context.Context, afterWorkerSequence uint64, cancelWhenResultBufferFull bool) (*EventSubscription, error) { streamCtx, cancel := context.WithCancel(ctx) stream, err := s.client.StreamEventsRaw(streamCtx, &pb.StreamEventsRequest{ SessionId: s.ID(), AfterWorkerSequence: afterWorkerSequence, }) if err != nil { cancel() return nil, err } results := make(chan EventResult, 16) done := make(chan struct{}) go func() { defer close(results) defer close(done) for { event, err := stream.Recv() if err == nil { if !sendEventResult(streamCtx, results, EventResult{Event: event}, cancelWhenResultBufferFull, cancel) { return } continue } if err == io.EOF || status.Code(err) == codes.Canceled || streamCtx.Err() != nil { return } sendEventResult( streamCtx, results, EventResult{Err: &GatewayError{Op: "stream events", Err: err}}, cancelWhenResultBufferFull, cancel) return } }() return &EventSubscription{ results: results, cancel: cancel, done: done, }, nil } func ensureBulkSize(name string, length int) error { if length > maxBulkItems { return fmt.Errorf("mxgateway: %s bulk commands are limited to %d item(s)", name, maxBulkItems) } return nil } func sendEventResult( ctx context.Context, results chan<- EventResult, result EventResult, cancelWhenBufferFull bool, cancel context.CancelFunc, ) bool { if cancelWhenBufferFull { select { case results <- result: return true case <-ctx.Done(): return false default: cancel() return false } } select { case results <- result: return true case <-ctx.Done(): return false } } func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) { return s.client.Invoke(ctx, &pb.MxCommandRequest{ SessionId: s.ID(), ClientCorrelationId: newCorrelationID(), Command: command, }) } func newCorrelationID() string { var buffer [16]byte if _, err := rand.Read(buffer[:]); err != nil { return "" } return hex.EncodeToString(buffer[:]) }