Add bulk MXAccess subscription commands

This commit is contained in:
Joseph Doherty
2026-04-26 22:29:27 -04:00
parent daff16cfd2
commit 3d11ac3316
31 changed files with 14346 additions and 969 deletions
@@ -137,6 +137,44 @@ public sealed class MxGatewayClientSessionTests
Assert.Equal(56, request.Command.Write2.UserId); Assert.Equal(56, request.Command.Write2.UserId);
} }
[Fact]
public async Task SubscribeBulkAsync_BuildsOneBulkCommandAndReturnsPerItemResults()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.SubscribeBulk,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
SubscribeBulk = new BulkSubscribeReply
{
Results =
{
new SubscribeResult
{
ServerHandle = 12,
TagAddress = "Area001.Pump001.Speed",
ItemHandle = 34,
WasSuccessful = true,
},
},
},
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
IReadOnlyList<SubscribeResult> results = await session.SubscribeBulkAsync(
12,
["Area001.Pump001.Speed"]);
SubscribeResult result = Assert.Single(results);
Assert.Equal(34, result.ItemHandle);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.SubscribeBulk, request.Command.Kind);
Assert.Equal(12, request.Command.SubscribeBulk.ServerHandle);
Assert.Equal(["Area001.Pump001.Speed"], request.Command.SubscribeBulk.TagAddresses);
}
[Fact] [Fact]
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder() public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
{ {
@@ -175,6 +175,194 @@ public sealed class MxGatewaySession : IAsyncDisposable
cancellationToken); cancellationToken);
} }
public async Task UnAdviseAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await UnAdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> UnAdviseRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.UnAdvise,
UnAdvise = new UnAdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
public async Task RemoveItemAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await RemoveItemRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
}
public Task<MxCommandReply> RemoveItemRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.RemoveItem,
RemoveItem = new RemoveItemCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
public async Task<IReadOnlyList<SubscribeResult>> AddItemBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
AddItemBulkCommand command = new() { ServerHandle = serverHandle };
command.TagAddresses.Add(tagAddresses);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItemBulk,
AddItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AddItemBulk?.Results.ToArray() ?? [];
}
public async Task<IReadOnlyList<SubscribeResult>> AdviseItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
AdviseItemBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AdviseItemBulk,
AdviseItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.AdviseItemBulk?.Results.ToArray() ?? [];
}
public async Task<IReadOnlyList<SubscribeResult>> RemoveItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
RemoveItemBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.RemoveItemBulk,
RemoveItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.RemoveItemBulk?.Results.ToArray() ?? [];
}
public async Task<IReadOnlyList<SubscribeResult>> UnAdviseItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnAdviseItemBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.UnAdviseItemBulk,
UnAdviseItemBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.UnAdviseItemBulk?.Results.ToArray() ?? [];
}
public async Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
SubscribeBulkCommand command = new() { ServerHandle = serverHandle };
command.TagAddresses.Add(tagAddresses);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.SubscribeBulk?.Results.ToArray() ?? [];
}
public async Task<IReadOnlyList<SubscribeResult>> UnsubscribeBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnsubscribeBulkCommand command = new() { ServerHandle = serverHandle };
command.ItemHandles.Add(itemHandles);
MxCommandReply reply = await InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = command,
},
cancellationToken)
.ConfigureAwait(false);
reply.EnsureProtocolSuccess().EnsureMxAccessSuccess();
return reply.UnsubscribeBulk?.Results.ToArray() ?? [];
}
public async Task WriteAsync( public async Task WriteAsync(
int serverHandle, int serverHandle,
int itemHandle, int itemHandle,
@@ -297,4 +485,5 @@ public sealed class MxGatewaySession : IAsyncDisposable
}, },
cancellationToken); cancellationToken);
} }
} }
File diff suppressed because it is too large Load Diff
@@ -117,6 +117,49 @@ func TestSessionHelpersBuildCommandsAndExposeRawReply(t *testing.T) {
} }
} }
func TestSubscribeBulkBuildsOneBulkCommandAndReturnsResults(t *testing.T) {
fake := &fakeGatewayServer{
invokeReply: &pb.MxCommandReply{
SessionId: "session-1",
Kind: pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK,
ProtocolStatus: &pb.ProtocolStatus{
Code: pb.ProtocolStatusCode_PROTOCOL_STATUS_CODE_OK,
},
Payload: &pb.MxCommandReply_SubscribeBulk{
SubscribeBulk: &pb.BulkSubscribeReply{
Results: []*pb.SubscribeResult{
{
ServerHandle: 12,
TagAddress: "Area001.Pump001.Speed",
ItemHandle: 34,
WasSuccessful: true,
},
},
},
},
},
}
client, cleanup := newBufconnClient(t, fake)
defer cleanup()
session := NewSessionForID(client, "session-1")
results, err := session.SubscribeBulk(context.Background(), 12, []string{"Area001.Pump001.Speed"})
if err != nil {
t.Fatalf("SubscribeBulk() error = %v", err)
}
if len(results) != 1 || results[0].GetItemHandle() != 34 {
t.Fatalf("results = %#v, want item handle 34", results)
}
req := fake.invokeRequest
if req.GetCommand().GetKind() != pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK {
t.Fatalf("command kind = %s", req.GetCommand().GetKind())
}
if got := req.GetCommand().GetSubscribeBulk().GetTagAddresses(); len(got) != 1 || got[0] != "Area001.Pump001.Speed" {
t.Fatalf("tag addresses = %#v", got)
}
}
func TestInvokeReturnsTypedMxAccessErrorWithRawReply(t *testing.T) { func TestInvokeReturnsTypedMxAccessErrorWithRawReply(t *testing.T) {
hresult := int32(-2147467259) hresult := int32(-2147467259)
fake := &fakeGatewayServer{ fake := &fakeGatewayServer{
+158
View File
@@ -104,6 +104,25 @@ func (s *Session) Unregister(ctx context.Context, serverHandle int32) error {
return err return err
} }
// RemoveItem invokes MXAccess RemoveItem.
func (s *Session) RemoveItem(ctx context.Context, serverHandle, itemHandle int32) error {
_, err := s.RemoveItemRaw(ctx, serverHandle, itemHandle)
return err
}
// RemoveItemRaw invokes MXAccess RemoveItem and returns the raw reply.
func (s *Session) RemoveItemRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM,
Payload: &pb.MxCommand_RemoveItem{
RemoveItem: &pb.RemoveItemCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
},
},
})
}
// AddItem invokes MXAccess AddItem and returns the item handle. // AddItem invokes MXAccess AddItem and returns the item handle.
func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) { func (s *Session) AddItem(ctx context.Context, serverHandle int32, itemDefinition string) (int32, error) {
reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition) reply, err := s.AddItemRaw(ctx, serverHandle, itemDefinition)
@@ -182,6 +201,145 @@ func (s *Session) AdviseRaw(ctx context.Context, serverHandle, itemHandle int32)
}) })
} }
// UnAdvise invokes MXAccess UnAdvise.
func (s *Session) UnAdvise(ctx context.Context, serverHandle, itemHandle int32) error {
_, err := s.UnAdviseRaw(ctx, serverHandle, itemHandle)
return err
}
// UnAdviseRaw invokes MXAccess UnAdvise and returns the raw reply.
func (s *Session) UnAdviseRaw(ctx context.Context, serverHandle, itemHandle int32) (*MxCommandReply, error) {
return s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE,
Payload: &pb.MxCommand_UnAdvise{
UnAdvise: &pb.UnAdviseCommand{
ServerHandle: serverHandle,
ItemHandle: itemHandle,
},
},
})
}
// AddItemBulk invokes MXAccess AddItem for each tag inside one gateway command.
func (s *Session) AddItemBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*SubscribeResult, error) {
if tagAddresses == nil {
return nil, errors.New("mxgateway: tag addresses are required")
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM_BULK,
Payload: &pb.MxCommand_AddItemBulk{
AddItemBulk: &pb.AddItemBulkCommand{
ServerHandle: serverHandle,
TagAddresses: tagAddresses,
},
},
})
if err != nil {
return nil, err
}
return reply.GetAddItemBulk().GetResults(), nil
}
// AdviseItemBulk invokes MXAccess Advise for each item handle inside one gateway command.
func (s *Session) AdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE_ITEM_BULK,
Payload: &pb.MxCommand_AdviseItemBulk{
AdviseItemBulk: &pb.AdviseItemBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetAdviseItemBulk().GetResults(), nil
}
// RemoveItemBulk invokes MXAccess RemoveItem for each item handle inside one gateway command.
func (s *Session) RemoveItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM_BULK,
Payload: &pb.MxCommand_RemoveItemBulk{
RemoveItemBulk: &pb.RemoveItemBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetRemoveItemBulk().GetResults(), nil
}
// UnAdviseItemBulk invokes MXAccess UnAdvise for each item handle inside one gateway command.
func (s *Session) UnAdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK,
Payload: &pb.MxCommand_UnAdviseItemBulk{
UnAdviseItemBulk: &pb.UnAdviseItemBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetUnAdviseItemBulk().GetResults(), nil
}
// SubscribeBulk invokes AddItem and Advise for each tag inside one gateway command.
func (s *Session) SubscribeBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*SubscribeResult, error) {
if tagAddresses == nil {
return nil, errors.New("mxgateway: tag addresses are required")
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK,
Payload: &pb.MxCommand_SubscribeBulk{
SubscribeBulk: &pb.SubscribeBulkCommand{
ServerHandle: serverHandle,
TagAddresses: tagAddresses,
},
},
})
if err != nil {
return nil, err
}
return reply.GetSubscribeBulk().GetResults(), nil
}
// UnsubscribeBulk invokes UnAdvise and RemoveItem for each item handle inside one gateway command.
func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*SubscribeResult, error) {
if itemHandles == nil {
return nil, errors.New("mxgateway: item handles are required")
}
reply, err := s.invokeCommand(ctx, &pb.MxCommand{
Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNSUBSCRIBE_BULK,
Payload: &pb.MxCommand_UnsubscribeBulk{
UnsubscribeBulk: &pb.UnsubscribeBulkCommand{
ServerHandle: serverHandle,
ItemHandles: itemHandles,
},
},
})
if err != nil {
return nil, err
}
return reply.GetUnsubscribeBulk().GetResults(), nil
}
// Write invokes MXAccess Write. // Write invokes MXAccess Write.
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error { func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value *MxValue, userID int32) error {
_, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID) _, err := s.WriteRaw(ctx, serverHandle, itemHandle, value, userID)
+49 -31
View File
@@ -12,30 +12,40 @@ type RawEventStream = pb.MxAccessGateway_StreamEventsClient
// Generated protobuf aliases keep raw contract access available from the public // Generated protobuf aliases keep raw contract access available from the public
// mxgateway package while generated code remains under internal/generated. // mxgateway package while generated code remains under internal/generated.
type ( type (
OpenSessionRequest = pb.OpenSessionRequest OpenSessionRequest = pb.OpenSessionRequest
OpenSessionReply = pb.OpenSessionReply OpenSessionReply = pb.OpenSessionReply
CloseSessionRequest = pb.CloseSessionRequest CloseSessionRequest = pb.CloseSessionRequest
CloseSessionReply = pb.CloseSessionReply CloseSessionReply = pb.CloseSessionReply
StreamEventsRequest = pb.StreamEventsRequest StreamEventsRequest = pb.StreamEventsRequest
MxCommandRequest = pb.MxCommandRequest MxCommandRequest = pb.MxCommandRequest
MxCommandReply = pb.MxCommandReply MxCommandReply = pb.MxCommandReply
MxCommand = pb.MxCommand MxCommand = pb.MxCommand
MxEvent = pb.MxEvent MxEvent = pb.MxEvent
MxValue = pb.MxValue MxValue = pb.MxValue
Value = pb.MxValue Value = pb.MxValue
MxArray = pb.MxArray MxArray = pb.MxArray
MxStatusProxy = pb.MxStatusProxy MxStatusProxy = pb.MxStatusProxy
ProtocolStatus = pb.ProtocolStatus ProtocolStatus = pb.ProtocolStatus
RegisterCommand = pb.RegisterCommand RegisterCommand = pb.RegisterCommand
UnregisterCommand = pb.UnregisterCommand UnregisterCommand = pb.UnregisterCommand
AddItemCommand = pb.AddItemCommand AddItemCommand = pb.AddItemCommand
AddItem2Command = pb.AddItem2Command AddItem2Command = pb.AddItem2Command
AdviseCommand = pb.AdviseCommand RemoveItemCommand = pb.RemoveItemCommand
WriteCommand = pb.WriteCommand AdviseCommand = pb.AdviseCommand
Write2Command = pb.Write2Command UnAdviseCommand = pb.UnAdviseCommand
RegisterReply = pb.RegisterReply AddItemBulkCommand = pb.AddItemBulkCommand
AddItemReply = pb.AddItemReply AdviseItemBulkCommand = pb.AdviseItemBulkCommand
AddItem2Reply = pb.AddItem2Reply RemoveItemBulkCommand = pb.RemoveItemBulkCommand
UnAdviseItemBulkCommand = pb.UnAdviseItemBulkCommand
SubscribeBulkCommand = pb.SubscribeBulkCommand
UnsubscribeBulkCommand = pb.UnsubscribeBulkCommand
WriteCommand = pb.WriteCommand
Write2Command = pb.Write2Command
RegisterReply = pb.RegisterReply
AddItemReply = pb.AddItemReply
AddItem2Reply = pb.AddItem2Reply
SubscribeResult = pb.SubscribeResult
BulkSubscribeReply = pb.BulkSubscribeReply
) )
type ( type (
@@ -49,13 +59,21 @@ type (
) )
const ( const (
CommandKindRegister = pb.MxCommandKind_MX_COMMAND_KIND_REGISTER CommandKindRegister = pb.MxCommandKind_MX_COMMAND_KIND_REGISTER
CommandKindUnregister = pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER CommandKindUnregister = pb.MxCommandKind_MX_COMMAND_KIND_UNREGISTER
CommandKindAddItem = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM CommandKindAddItem = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM
CommandKindAddItem2 = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2 CommandKindAddItem2 = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM2
CommandKindAdvise = pb.MxCommandKind_MX_COMMAND_KIND_ADVISE CommandKindRemoveItem = pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM
CommandKindWrite = pb.MxCommandKind_MX_COMMAND_KIND_WRITE CommandKindAdvise = pb.MxCommandKind_MX_COMMAND_KIND_ADVISE
CommandKindWrite2 = pb.MxCommandKind_MX_COMMAND_KIND_WRITE2 CommandKindUnAdvise = pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE
CommandKindAddItemBulk = pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM_BULK
CommandKindAdviseItemBulk = pb.MxCommandKind_MX_COMMAND_KIND_ADVISE_ITEM_BULK
CommandKindRemoveItemBulk = pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM_BULK
CommandKindUnAdviseItemBulk = pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK
CommandKindSubscribeBulk = pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK
CommandKindUnsubscribeBulk = pb.MxCommandKind_MX_COMMAND_KIND_UNSUBSCRIBE_BULK
CommandKindWrite = pb.MxCommandKind_MX_COMMAND_KIND_WRITE
CommandKindWrite2 = pb.MxCommandKind_MX_COMMAND_KIND_WRITE2
DataTypeUnknown = pb.MxDataType_MX_DATA_TYPE_UNKNOWN DataTypeUnknown = pb.MxDataType_MX_DATA_TYPE_UNKNOWN
DataTypeBoolean = pb.MxDataType_MX_DATA_TYPE_BOOLEAN DataTypeBoolean = pb.MxDataType_MX_DATA_TYPE_BOOLEAN
@@ -2,9 +2,12 @@ package com.dohertylan.mxgateway.client;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.HexFormat; import java.util.HexFormat;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import mxaccess_gateway.v1.MxaccessGateway.AddItem2Command; import mxaccess_gateway.v1.MxaccessGateway.AddItem2Command;
import mxaccess_gateway.v1.MxaccessGateway.AddItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand; import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand; import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
@@ -15,7 +18,14 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxValue; import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply; import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand; import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.RemoveItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.RemoveItemCommand;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.UnAdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnAdviseItemBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnsubscribeBulkCommand;
import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand; import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.Write2Command; import mxaccess_gateway.v1.MxaccessGateway.Write2Command;
import mxaccess_gateway.v1.MxaccessGateway.WriteCommand; import mxaccess_gateway.v1.MxaccessGateway.WriteCommand;
@@ -117,6 +127,19 @@ public final class MxGatewaySession implements AutoCloseable {
.build()); .build());
} }
public void removeItem(int serverHandle, int itemHandle) {
removeItemRaw(serverHandle, itemHandle);
}
public MxCommandReply removeItemRaw(int serverHandle, int itemHandle) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_REMOVE_ITEM)
.setRemoveItem(RemoveItemCommand.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(itemHandle))
.build());
}
public void advise(int serverHandle, int itemHandle) { public void advise(int serverHandle, int itemHandle) {
adviseRaw(serverHandle, itemHandle); adviseRaw(serverHandle, itemHandle);
} }
@@ -130,6 +153,85 @@ public final class MxGatewaySession implements AutoCloseable {
.build()); .build());
} }
public void unAdvise(int serverHandle, int itemHandle) {
unAdviseRaw(serverHandle, itemHandle);
}
public MxCommandReply unAdviseRaw(int serverHandle, int itemHandle) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_UN_ADVISE)
.setUnAdvise(UnAdviseCommand.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(itemHandle))
.build());
}
public List<SubscribeResult> addItemBulk(int serverHandle, List<String> tagAddresses) {
Objects.requireNonNull(tagAddresses, "tagAddresses");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM_BULK)
.setAddItemBulk(AddItemBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllTagAddresses(tagAddresses))
.build());
return reply.getAddItemBulk().getResultsList();
}
public List<SubscribeResult> adviseItemBulk(int serverHandle, List<Integer> itemHandles) {
Objects.requireNonNull(itemHandles, "itemHandles");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADVISE_ITEM_BULK)
.setAdviseItemBulk(AdviseItemBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllItemHandles(itemHandles))
.build());
return reply.getAdviseItemBulk().getResultsList();
}
public List<SubscribeResult> removeItemBulk(int serverHandle, List<Integer> itemHandles) {
Objects.requireNonNull(itemHandles, "itemHandles");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_REMOVE_ITEM_BULK)
.setRemoveItemBulk(RemoveItemBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllItemHandles(itemHandles))
.build());
return reply.getRemoveItemBulk().getResultsList();
}
public List<SubscribeResult> unAdviseItemBulk(int serverHandle, List<Integer> itemHandles) {
Objects.requireNonNull(itemHandles, "itemHandles");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK)
.setUnAdviseItemBulk(UnAdviseItemBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllItemHandles(itemHandles))
.build());
return reply.getUnAdviseItemBulk().getResultsList();
}
public List<SubscribeResult> subscribeBulk(int serverHandle, List<String> tagAddresses) {
Objects.requireNonNull(tagAddresses, "tagAddresses");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_SUBSCRIBE_BULK)
.setSubscribeBulk(SubscribeBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllTagAddresses(tagAddresses))
.build());
return reply.getSubscribeBulk().getResultsList();
}
public List<SubscribeResult> unsubscribeBulk(int serverHandle, List<Integer> itemHandles) {
Objects.requireNonNull(itemHandles, "itemHandles");
MxCommandReply reply = invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_UNSUBSCRIBE_BULK)
.setUnsubscribeBulk(UnsubscribeBulkCommand.newBuilder()
.setServerHandle(serverHandle)
.addAllItemHandles(itemHandles))
.build());
return reply.getUnsubscribeBulk().getResultsList();
}
public void write(int serverHandle, int itemHandle, MxValue value, int userId) { public void write(int serverHandle, int itemHandle, MxValue value, int userId) {
writeRaw(serverHandle, itemHandle, value, userId); writeRaw(serverHandle, itemHandle, value, userId);
} }
@@ -17,12 +17,14 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.time.Duration; import java.time.Duration;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxAccessGatewayGrpc; import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply; import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.BulkSubscribeReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest; import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind; import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
@@ -36,6 +38,7 @@ import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply; import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState; import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
final class MxGatewayClientSessionTests { final class MxGatewayClientSessionTests {
@@ -112,6 +115,42 @@ final class MxGatewayClientSessionTests {
} }
} }
@Test
void subscribeBulkBuildsOneBulkCommandAndReturnsResults() throws Exception {
AtomicReference<MxCommandRequest> commandRequest = new AtomicReference<>();
TestGatewayService service = new TestGatewayService() {
@Override
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
commandRequest.set(request);
responseObserver.onNext(MxCommandReply.newBuilder()
.setSessionId(request.getSessionId())
.setKind(request.getCommand().getKind())
.setProtocolStatus(ok())
.setSubscribeBulk(BulkSubscribeReply.newBuilder()
.addResults(SubscribeResult.newBuilder()
.setServerHandle(12)
.setTagAddress("Area001.Pump001.Speed")
.setItemHandle(34)
.setWasSuccessful(true)))
.build());
responseObserver.onCompleted();
}
};
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
MxGatewaySession session = MxGatewaySession.forSessionId(client, "existing-session");
List<SubscribeResult> results = session.subscribeBulk(12, List.of("Area001.Pump001.Speed"));
assertEquals(34, results.get(0).getItemHandle());
assertEquals(MxCommandKind.MX_COMMAND_KIND_SUBSCRIBE_BULK, commandRequest.get().getCommand().getKind());
assertEquals(
List.of("Area001.Pump001.Speed"),
commandRequest.get().getCommand().getSubscribeBulk().getTagAddressesList());
}
}
@Test @Test
void streamCancellationCancelsServerCall() throws Exception { void streamCancellationCancelsServerCall() throws Exception {
CountDownLatch cancelled = new CountDownLatch(1); CountDownLatch cancelled = new CountDownLatch(1);
File diff suppressed because it is too large Load Diff
File diff suppressed because one or more lines are too long
+163 -1
View File
@@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import AsyncIterator from collections.abc import AsyncIterator, Sequence
from .errors import ensure_mxaccess_success from .errors import ensure_mxaccess_success
from .generated import mxaccess_gateway_pb2 as pb from .generated import mxaccess_gateway_pb2 as pb
@@ -89,6 +89,24 @@ class Session:
correlation_id=correlation_id, correlation_id=correlation_id,
) )
async def remove_item(
self,
server_handle: int,
item_handle: int,
*,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_REMOVE_ITEM,
remove_item=pb.RemoveItemCommand(
server_handle=server_handle,
item_handle=item_handle,
),
),
correlation_id=correlation_id,
)
async def add_item( async def add_item(
self, self,
server_handle: int, server_handle: int,
@@ -147,6 +165,150 @@ class Session:
correlation_id=correlation_id, correlation_id=correlation_id,
) )
async def unadvise(
self,
server_handle: int,
item_handle: int,
*,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_UN_ADVISE,
un_advise=pb.UnAdviseCommand(
server_handle=server_handle,
item_handle=item_handle,
),
),
correlation_id=correlation_id,
)
async def add_item_bulk(
self,
server_handle: int,
tag_addresses: Sequence[str],
*,
correlation_id: str = "",
) -> list[pb.SubscribeResult]:
if tag_addresses is None:
raise TypeError("tag_addresses is required")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADD_ITEM_BULK,
add_item_bulk=pb.AddItemBulkCommand(
server_handle=server_handle,
tag_addresses=tag_addresses,
),
),
correlation_id=correlation_id,
)
return list(reply.add_item_bulk.results)
async def advise_item_bulk(
self,
server_handle: int,
item_handles: Sequence[int],
*,
correlation_id: str = "",
) -> list[pb.SubscribeResult]:
if item_handles is None:
raise TypeError("item_handles is required")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADVISE_ITEM_BULK,
advise_item_bulk=pb.AdviseItemBulkCommand(
server_handle=server_handle,
item_handles=item_handles,
),
),
correlation_id=correlation_id,
)
return list(reply.advise_item_bulk.results)
async def remove_item_bulk(
self,
server_handle: int,
item_handles: Sequence[int],
*,
correlation_id: str = "",
) -> list[pb.SubscribeResult]:
if item_handles is None:
raise TypeError("item_handles is required")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_REMOVE_ITEM_BULK,
remove_item_bulk=pb.RemoveItemBulkCommand(
server_handle=server_handle,
item_handles=item_handles,
),
),
correlation_id=correlation_id,
)
return list(reply.remove_item_bulk.results)
async def unadvise_item_bulk(
self,
server_handle: int,
item_handles: Sequence[int],
*,
correlation_id: str = "",
) -> list[pb.SubscribeResult]:
if item_handles is None:
raise TypeError("item_handles is required")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK,
un_advise_item_bulk=pb.UnAdviseItemBulkCommand(
server_handle=server_handle,
item_handles=item_handles,
),
),
correlation_id=correlation_id,
)
return list(reply.un_advise_item_bulk.results)
async def subscribe_bulk(
self,
server_handle: int,
tag_addresses: Sequence[str],
*,
correlation_id: str = "",
) -> list[pb.SubscribeResult]:
if tag_addresses is None:
raise TypeError("tag_addresses is required")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_SUBSCRIBE_BULK,
subscribe_bulk=pb.SubscribeBulkCommand(
server_handle=server_handle,
tag_addresses=tag_addresses,
),
),
correlation_id=correlation_id,
)
return list(reply.subscribe_bulk.results)
async def unsubscribe_bulk(
self,
server_handle: int,
item_handles: Sequence[int],
*,
correlation_id: str = "",
) -> list[pb.SubscribeResult]:
if item_handles is None:
raise TypeError("item_handles is required")
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_UNSUBSCRIBE_BULK,
unsubscribe_bulk=pb.UnsubscribeBulkCommand(
server_handle=server_handle,
item_handles=item_handles,
),
),
correlation_id=correlation_id,
)
return list(reply.unsubscribe_bulk.results)
async def write( async def write(
self, self,
server_handle: int, server_handle: int,
@@ -58,6 +58,41 @@ async def test_mxaccess_error_preserves_raw_reply() -> None:
assert captured.value.raw_reply is failure_reply assert captured.value.raw_reply is failure_reply
@pytest.mark.asyncio
async def test_subscribe_bulk_sends_one_bulk_command_and_returns_results() -> None:
stub = FakeGatewayStub()
bulk_reply = pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_SUBSCRIBE_BULK,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
subscribe_bulk=pb.BulkSubscribeReply(
results=[
pb.SubscribeResult(
server_handle=12,
tag_address="Area001.Pump001.Speed",
item_handle=34,
was_successful=True,
),
],
),
)
stub.invoke.replies = [bulk_reply]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
results = await session.subscribe_bulk(12, ["Area001.Pump001.Speed"])
assert results[0].item_handle == 34
assert len(stub.invoke.requests) == 1
assert stub.invoke.requests[0].command.kind == pb.MX_COMMAND_KIND_SUBSCRIBE_BULK
assert list(stub.invoke.requests[0].command.subscribe_bulk.tag_addresses) == [
"Area001.Pump001.Speed",
]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stream_events_cancels_underlying_call_when_closed() -> None: async def test_stream_events_cancels_underlying_call_when_closed() -> None:
stream = FakeStream( stream = FakeStream(
+172 -3
View File
@@ -3,9 +3,11 @@ use crate::error::Error;
use crate::generated::mxaccess_gateway::v1::mx_command::Payload; use crate::generated::mxaccess_gateway::v1::mx_command::Payload;
use crate::generated::mxaccess_gateway::v1::mx_command_reply; use crate::generated::mxaccess_gateway::v1::mx_command_reply;
use crate::generated::mxaccess_gateway::v1::{ use crate::generated::mxaccess_gateway::v1::{
AddItem2Command, AddItemCommand, AdviseCommand, CloseSessionRequest, MxCommand, MxCommandKind, AddItem2Command, AddItemBulkCommand, AddItemCommand, AdviseCommand, AdviseItemBulkCommand,
MxCommandReply, MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, RegisterCommand, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandReply, MxCommandRequest,
StreamEventsRequest, Write2Command, WriteCommand, MxValue as ProtoMxValue, OpenSessionRequest, RegisterCommand, RemoveItemBulkCommand,
RemoveItemCommand, StreamEventsRequest, SubscribeBulkCommand, SubscribeResult, UnAdviseCommand,
UnAdviseItemBulkCommand, UnsubscribeBulkCommand, Write2Command, WriteCommand,
}; };
use crate::value::MxValue; use crate::value::MxValue;
@@ -94,6 +96,18 @@ impl Session {
Ok(add_item2_handle(&reply)) Ok(add_item2_handle(&reply))
} }
pub async fn remove_item(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
self.invoke(
MxCommandKind::RemoveItem,
Payload::RemoveItem(RemoveItemCommand {
server_handle,
item_handle,
}),
)
.await?;
Ok(())
}
pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
self.invoke( self.invoke(
MxCommandKind::Advise, MxCommandKind::Advise,
@@ -106,6 +120,126 @@ impl Session {
Ok(()) Ok(())
} }
pub async fn un_advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
self.invoke(
MxCommandKind::UnAdvise,
Payload::UnAdvise(UnAdviseCommand {
server_handle,
item_handle,
}),
)
.await?;
Ok(())
}
pub async fn add_item_bulk(
&self,
server_handle: i32,
tag_addresses: Vec<String>,
) -> Result<Vec<SubscribeResult>, Error> {
let reply = self
.invoke(
MxCommandKind::AddItemBulk,
Payload::AddItemBulk(AddItemBulkCommand {
server_handle,
tag_addresses,
}),
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::AddItemBulk))
}
pub async fn advise_item_bulk(
&self,
server_handle: i32,
item_handles: Vec<i32>,
) -> Result<Vec<SubscribeResult>, Error> {
let reply = self
.invoke(
MxCommandKind::AdviseItemBulk,
Payload::AdviseItemBulk(AdviseItemBulkCommand {
server_handle,
item_handles,
}),
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::AdviseItemBulk))
}
pub async fn remove_item_bulk(
&self,
server_handle: i32,
item_handles: Vec<i32>,
) -> Result<Vec<SubscribeResult>, Error> {
let reply = self
.invoke(
MxCommandKind::RemoveItemBulk,
Payload::RemoveItemBulk(RemoveItemBulkCommand {
server_handle,
item_handles,
}),
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::RemoveItemBulk))
}
pub async fn un_advise_item_bulk(
&self,
server_handle: i32,
item_handles: Vec<i32>,
) -> Result<Vec<SubscribeResult>, Error> {
let reply = self
.invoke(
MxCommandKind::UnAdviseItemBulk,
Payload::UnAdviseItemBulk(UnAdviseItemBulkCommand {
server_handle,
item_handles,
}),
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::UnAdviseItemBulk))
}
pub async fn subscribe_bulk(
&self,
server_handle: i32,
tag_addresses: Vec<String>,
) -> Result<Vec<SubscribeResult>, Error> {
let reply = self
.invoke(
MxCommandKind::SubscribeBulk,
Payload::SubscribeBulk(SubscribeBulkCommand {
server_handle,
tag_addresses,
}),
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::SubscribeBulk))
}
pub async fn unsubscribe_bulk(
&self,
server_handle: i32,
item_handles: Vec<i32>,
) -> Result<Vec<SubscribeResult>, Error> {
let reply = self
.invoke(
MxCommandKind::UnsubscribeBulk,
Payload::UnsubscribeBulk(UnsubscribeBulkCommand {
server_handle,
item_handles,
}),
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::UnsubscribeBulk))
}
pub async fn write( pub async fn write(
&self, &self,
server_handle: i32, server_handle: i32,
@@ -226,6 +360,41 @@ fn add_item2_handle(reply: &MxCommandReply) -> i32 {
} }
} }
enum BulkReplyKind {
AddItemBulk,
AdviseItemBulk,
RemoveItemBulk,
UnAdviseItemBulk,
SubscribeBulk,
UnsubscribeBulk,
}
fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Vec<SubscribeResult> {
match (reply.payload, kind) {
(Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItemBulk) => {
reply.results
}
(Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItemBulk) => {
reply.results
}
(Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItemBulk) => {
reply.results
}
(
Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)),
BulkReplyKind::UnAdviseItemBulk,
) => reply.results,
(Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::SubscribeBulk) => {
reply.results
}
(
Some(mx_command_reply::Payload::UnsubscribeBulk(reply)),
BulkReplyKind::UnsubscribeBulk,
) => reply.results,
_ => Vec::new(),
}
}
fn int32_reply_value(value: &ProtoMxValue) -> Option<i32> { fn int32_reply_value(value: &ProtoMxValue) -> Option<i32> {
match value.kind.as_ref()? { match value.kind.as_ref()? {
crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value), crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value),
+44 -4
View File
@@ -14,10 +14,10 @@ use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server:
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply; use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind; use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
use mxgateway_client::generated::mxaccess_gateway::v1::{ use mxgateway_client::generated::mxaccess_gateway::v1::{
AddItemReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, AddItemReply, BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind,
MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, MxCommandReply, MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy,
OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, SessionState, MxStatusSource, MxValue, OpenSessionReply, OpenSessionRequest, ProtocolStatus,
StreamEventsRequest, ProtocolStatusCode, SessionState, StreamEventsRequest, SubscribeResult,
}; };
use mxgateway_client::{ use mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue, ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
@@ -87,6 +87,25 @@ async fn session_helpers_build_commands_and_preserve_command_errors() {
assert_eq!(error.reply().statuses.len(), 2); assert_eq!(error.reply().statuses.len(), 2);
} }
#[tokio::test]
async fn subscribe_bulk_builds_one_bulk_command_and_returns_results() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()])
.await
.unwrap();
assert_eq!(results[0].item_handle, 34);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::SubscribeBulk as i32));
}
#[tokio::test] #[tokio::test]
async fn event_stream_preserves_order_and_drop_cancels_server_stream() { async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
let state = Arc::new(FakeState::default()); let state = Arc::new(FakeState::default());
@@ -268,6 +287,27 @@ impl MxAccessGateway for FakeGateway {
return Ok(Response::new(mxaccess_failure_reply())); return Ok(Response::new(mxaccess_failure_reply()));
} }
if kind == MxCommandKind::SubscribeBulk as i32 {
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::SubscribeBulk(
BulkSubscribeReply {
results: vec![SubscribeResult {
server_handle: 12,
tag_address: "Area001.Pump001.Speed".to_owned(),
item_handle: 34,
was_successful: true,
error_message: String::new(),
}],
},
)),
..MxCommandReply::default()
}));
}
Ok(Response::new(MxCommandReply { Ok(Response::new(MxCommandReply {
session_id: request.session_id, session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(), correlation_id: "fake-correlation".to_owned(),
+13
View File
@@ -10,6 +10,19 @@ recreated by the contracts project build.
`MxAccessGateway` gRPC service, command payloads, command replies, event DTOs, `MxAccessGateway` gRPC service, command payloads, command replies, event DTOs,
`MxValue`, `MxArray`, and `MxStatusProxy`. `MxValue`, `MxArray`, and `MxStatusProxy`.
The public command model includes bulk subscription command kinds for
`AddItemBulk`, `AdviseItemBulk`, `RemoveItemBulk`, `UnAdviseItemBulk`,
`SubscribeBulk`, and `UnsubscribeBulk`. These commands are normal unary
`Invoke` payloads. They do not add separate gRPC methods, and they return a
`BulkSubscribeReply` containing per-item `SubscribeResult` records with
`ServerHandle`, `TagAddress`, `ItemHandle`, `WasSuccessful`, and
`ErrorMessage`.
The gateway forwards each bulk command as one worker command. The worker runs
the corresponding MXAccess `AddItem`, `Advise`, `UnAdvise`, and `RemoveItem`
calls sequentially on the session STA and preserves input order in the result
list.
`src/MxGateway.Contracts/Protos/mxaccess_worker.proto` defines the named-pipe `src/MxGateway.Contracts/Protos/mxaccess_worker.proto` defines the named-pipe
worker IPC envelope and control messages. It imports worker IPC envelope and control messages. It imports
`mxaccess_gateway.proto` so the worker and gateway use the same command, reply, `mxaccess_gateway.proto` so the worker and gateway use the same command, reply,
+20
View File
@@ -106,6 +106,26 @@ session.close()
client.close() client.close()
``` ```
Each language should expose the gateway bulk subscription commands with
idiomatic names:
```text
session.addItemBulk(serverHandle, tagAddresses)
session.adviseItemBulk(serverHandle, itemHandles)
session.removeItemBulk(serverHandle, itemHandles)
session.unAdviseItemBulk(serverHandle, itemHandles)
session.subscribeBulk(serverHandle, tagAddresses)
session.unsubscribeBulk(serverHandle, itemHandles)
```
These methods send one `Invoke` request using the matching bulk command kind.
They return the gateway `SubscribeResult` list without inventing client-only
handles. `SubscribeBulk` performs `AddItem` then `Advise` per tag inside the
worker session. `UnsubscribeBulk` performs `UnAdvise` then `RemoveItem` per item
handle. Per-item failures are returned in `SubscribeResult`; transport,
gateway, and cancellation failures still use each language's normal error
surface.
Each library should also expose lower-level calls: Each library should also expose lower-level calls:
```text ```text
+6
View File
@@ -83,6 +83,12 @@ public sealed class MxGatewaySession : IAsyncDisposable
public Task<int> AddItem2Async(int serverHandle, string item, string context, CancellationToken ct = default); public Task<int> AddItem2Async(int serverHandle, string item, string context, CancellationToken ct = default);
public Task AdviseAsync(int serverHandle, int itemHandle, CancellationToken ct = default); public Task AdviseAsync(int serverHandle, int itemHandle, CancellationToken ct = default);
public Task UnAdviseAsync(int serverHandle, int itemHandle, CancellationToken ct = default); public Task UnAdviseAsync(int serverHandle, int itemHandle, CancellationToken ct = default);
public Task<IReadOnlyList<SubscribeResult>> AddItemBulkAsync(int serverHandle, IReadOnlyList<string> tagAddresses, CancellationToken ct = default);
public Task<IReadOnlyList<SubscribeResult>> AdviseItemBulkAsync(int serverHandle, IReadOnlyList<int> itemHandles, CancellationToken ct = default);
public Task<IReadOnlyList<SubscribeResult>> RemoveItemBulkAsync(int serverHandle, IReadOnlyList<int> itemHandles, CancellationToken ct = default);
public Task<IReadOnlyList<SubscribeResult>> UnAdviseItemBulkAsync(int serverHandle, IReadOnlyList<int> itemHandles, CancellationToken ct = default);
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(int serverHandle, IReadOnlyList<string> tagAddresses, CancellationToken ct = default);
public Task<IReadOnlyList<SubscribeResult>> UnsubscribeBulkAsync(int serverHandle, IReadOnlyList<int> itemHandles, CancellationToken ct = default);
public Task WriteAsync(int serverHandle, int itemHandle, MxValue value, int userId, CancellationToken ct = default); public Task WriteAsync(int serverHandle, int itemHandle, MxValue value, int userId, CancellationToken ct = default);
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken ct = default); public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken ct = default);
public Task CloseAsync(CancellationToken ct = default); public Task CloseAsync(CancellationToken ct = default);
+6
View File
@@ -74,6 +74,12 @@ func (s *Session) Unregister(ctx context.Context, serverHandle int32) error
func (s *Session) AddItem(ctx context.Context, serverHandle int32, item string) (int32, error) func (s *Session) AddItem(ctx context.Context, serverHandle int32, item string) (int32, error)
func (s *Session) AddItem2(ctx context.Context, serverHandle int32, item, context string) (int32, error) func (s *Session) AddItem2(ctx context.Context, serverHandle int32, item, context string) (int32, error)
func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error func (s *Session) Advise(ctx context.Context, serverHandle, itemHandle int32) error
func (s *Session) AddItemBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*pb.SubscribeResult, error)
func (s *Session) AdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*pb.SubscribeResult, error)
func (s *Session) RemoveItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*pb.SubscribeResult, error)
func (s *Session) UnAdviseItemBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*pb.SubscribeResult, error)
func (s *Session) SubscribeBulk(ctx context.Context, serverHandle int32, tagAddresses []string) ([]*pb.SubscribeResult, error)
func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemHandles []int32) ([]*pb.SubscribeResult, error)
func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value Value, userID int32) error func (s *Session) Write(ctx context.Context, serverHandle, itemHandle int32, value Value, userID int32) error
func (s *Session) Events(ctx context.Context) (<-chan EventResult, error) func (s *Session) Events(ctx context.Context) (<-chan EventResult, error)
func (s *Session) Close(ctx context.Context) error func (s *Session) Close(ctx context.Context) error
+6
View File
@@ -64,6 +64,12 @@ public final class MxGatewaySession implements AutoCloseable {
public int addItem(int serverHandle, String item); public int addItem(int serverHandle, String item);
public int addItem2(int serverHandle, String item, String context); public int addItem2(int serverHandle, String item, String context);
public void advise(int serverHandle, int itemHandle); public void advise(int serverHandle, int itemHandle);
public List<SubscribeResult> addItemBulk(int serverHandle, List<String> tagAddresses);
public List<SubscribeResult> adviseItemBulk(int serverHandle, List<Integer> itemHandles);
public List<SubscribeResult> removeItemBulk(int serverHandle, List<Integer> itemHandles);
public List<SubscribeResult> unAdviseItemBulk(int serverHandle, List<Integer> itemHandles);
public List<SubscribeResult> subscribeBulk(int serverHandle, List<String> tagAddresses);
public List<SubscribeResult> unsubscribeBulk(int serverHandle, List<Integer> itemHandles);
public void write(int serverHandle, int itemHandle, MxValue value, int userId); public void write(int serverHandle, int itemHandle, MxValue value, int userId);
public Iterator<MxEvent> streamEvents(); public Iterator<MxEvent> streamEvents();
public void streamEventsAsync(StreamObserver<MxEvent> observer); public void streamEventsAsync(StreamObserver<MxEvent> observer);
+6
View File
@@ -82,6 +82,12 @@ class Session:
async def add_item(self, server_handle: int, item: str) -> int: ... async def add_item(self, server_handle: int, item: str) -> int: ...
async def add_item2(self, server_handle: int, item: str, context: str) -> int: ... async def add_item2(self, server_handle: int, item: str, context: str) -> int: ...
async def advise(self, server_handle: int, item_handle: int) -> None: ... async def advise(self, server_handle: int, item_handle: int) -> None: ...
async def add_item_bulk(self, server_handle: int, tag_addresses: Sequence[str]) -> list[SubscribeResult]: ...
async def advise_item_bulk(self, server_handle: int, item_handles: Sequence[int]) -> list[SubscribeResult]: ...
async def remove_item_bulk(self, server_handle: int, item_handles: Sequence[int]) -> list[SubscribeResult]: ...
async def unadvise_item_bulk(self, server_handle: int, item_handles: Sequence[int]) -> list[SubscribeResult]: ...
async def subscribe_bulk(self, server_handle: int, tag_addresses: Sequence[str]) -> list[SubscribeResult]: ...
async def unsubscribe_bulk(self, server_handle: int, item_handles: Sequence[int]) -> list[SubscribeResult]: ...
async def write(self, server_handle: int, item_handle: int, value: MxValueInput, user_id: int = 0) -> None: ... async def write(self, server_handle: int, item_handle: int, value: MxValueInput, user_id: int = 0) -> None: ...
async def stream_events(self) -> AsyncIterator[MxEvent]: ... async def stream_events(self) -> AsyncIterator[MxEvent]: ...
async def close(self) -> None: ... async def close(self) -> None: ...
+6
View File
@@ -81,6 +81,12 @@ impl Session {
pub async fn add_item(&self, server_handle: i32, item: &str) -> Result<i32, Error>; pub async fn add_item(&self, server_handle: i32, item: &str) -> Result<i32, Error>;
pub async fn add_item2(&self, server_handle: i32, item: &str, context: &str) -> Result<i32, Error>; pub async fn add_item2(&self, server_handle: i32, item: &str, context: &str) -> Result<i32, Error>;
pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error>; pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error>;
pub async fn add_item_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn advise_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn remove_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn un_advise_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn subscribe_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn unsubscribe_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
pub async fn write(&self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32) -> Result<(), Error>; pub async fn write(&self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32) -> Result<(), Error>;
pub async fn events(&self) -> Result<impl Stream<Item = Result<MxEvent, Error>>, Error>; pub async fn events(&self) -> Result<impl Stream<Item = Result<MxEvent, Error>>, Error>;
pub async fn close(&self) -> Result<(), Error>; pub async fn close(&self) -> Result<(), Error>;
File diff suppressed because it is too large Load Diff
@@ -80,6 +80,12 @@ message MxCommand {
WriteSecured2Command write_secured2 = 25; WriteSecured2Command write_secured2 = 25;
AuthenticateUserCommand authenticate_user = 26; AuthenticateUserCommand authenticate_user = 26;
ArchestrAUserToIdCommand archestra_user_to_id = 27; ArchestrAUserToIdCommand archestra_user_to_id = 27;
AddItemBulkCommand add_item_bulk = 28;
AdviseItemBulkCommand advise_item_bulk = 29;
RemoveItemBulkCommand remove_item_bulk = 30;
UnAdviseItemBulkCommand un_advise_item_bulk = 31;
SubscribeBulkCommand subscribe_bulk = 32;
UnsubscribeBulkCommand unsubscribe_bulk = 33;
PingCommand ping = 100; PingCommand ping = 100;
GetSessionStateCommand get_session_state = 101; GetSessionStateCommand get_session_state = 101;
GetWorkerInfoCommand get_worker_info = 102; GetWorkerInfoCommand get_worker_info = 102;
@@ -108,6 +114,12 @@ enum MxCommandKind {
MX_COMMAND_KIND_WRITE_SECURED2 = 16; MX_COMMAND_KIND_WRITE_SECURED2 = 16;
MX_COMMAND_KIND_AUTHENTICATE_USER = 17; MX_COMMAND_KIND_AUTHENTICATE_USER = 17;
MX_COMMAND_KIND_ARCHESTRA_USER_TO_ID = 18; MX_COMMAND_KIND_ARCHESTRA_USER_TO_ID = 18;
MX_COMMAND_KIND_ADD_ITEM_BULK = 19;
MX_COMMAND_KIND_ADVISE_ITEM_BULK = 20;
MX_COMMAND_KIND_REMOVE_ITEM_BULK = 21;
MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK = 22;
MX_COMMAND_KIND_SUBSCRIBE_BULK = 23;
MX_COMMAND_KIND_UNSUBSCRIBE_BULK = 24;
MX_COMMAND_KIND_PING = 100; MX_COMMAND_KIND_PING = 100;
MX_COMMAND_KIND_GET_SESSION_STATE = 101; MX_COMMAND_KIND_GET_SESSION_STATE = 101;
MX_COMMAND_KIND_GET_WORKER_INFO = 102; MX_COMMAND_KIND_GET_WORKER_INFO = 102;
@@ -224,6 +236,36 @@ message ArchestrAUserToIdCommand {
string user_id_guid = 2; string user_id_guid = 2;
} }
message AddItemBulkCommand {
int32 server_handle = 1;
repeated string tag_addresses = 2;
}
message AdviseItemBulkCommand {
int32 server_handle = 1;
repeated int32 item_handles = 2;
}
message RemoveItemBulkCommand {
int32 server_handle = 1;
repeated int32 item_handles = 2;
}
message UnAdviseItemBulkCommand {
int32 server_handle = 1;
repeated int32 item_handles = 2;
}
message SubscribeBulkCommand {
int32 server_handle = 1;
repeated string tag_addresses = 2;
}
message UnsubscribeBulkCommand {
int32 server_handle = 1;
repeated int32 item_handles = 2;
}
message PingCommand { message PingCommand {
string message = 1; string message = 1;
} }
@@ -264,6 +306,12 @@ message MxCommandReply {
ActivateReply activate = 25; ActivateReply activate = 25;
AuthenticateUserReply authenticate_user = 26; AuthenticateUserReply authenticate_user = 26;
ArchestrAUserToIdReply archestra_user_to_id = 27; ArchestrAUserToIdReply archestra_user_to_id = 27;
BulkSubscribeReply add_item_bulk = 28;
BulkSubscribeReply advise_item_bulk = 29;
BulkSubscribeReply remove_item_bulk = 30;
BulkSubscribeReply un_advise_item_bulk = 31;
BulkSubscribeReply subscribe_bulk = 32;
BulkSubscribeReply unsubscribe_bulk = 33;
SessionStateReply session_state = 100; SessionStateReply session_state = 100;
WorkerInfoReply worker_info = 101; WorkerInfoReply worker_info = 101;
DrainEventsReply drain_events = 102; DrainEventsReply drain_events = 102;
@@ -302,6 +350,18 @@ message ArchestrAUserToIdReply {
int32 user_id = 1; int32 user_id = 1;
} }
message SubscribeResult {
int32 server_handle = 1;
string tag_address = 2;
int32 item_handle = 3;
bool was_successful = 4;
string error_message = 5;
}
message BulkSubscribeReply {
repeated SubscribeResult results = 1;
}
message SessionStateReply { message SessionStateReply {
SessionState state = 1; SessionState state = 1;
} }
@@ -43,6 +43,7 @@ public sealed class MxAccessGatewayService(
reply.Capabilities.Add("unary-close-session"); reply.Capabilities.Add("unary-close-session");
reply.Capabilities.Add("unary-invoke"); reply.Capabilities.Add("unary-invoke");
reply.Capabilities.Add("server-stream-events"); reply.Capabilities.Add("server-stream-events");
reply.Capabilities.Add("bulk-subscribe-commands");
return reply; return reply;
} }
@@ -85,6 +85,12 @@ public sealed class MxAccessGrpcRequestValidator
MxCommandKind.WriteSecured2 => MxCommand.PayloadOneofCase.WriteSecured2, MxCommandKind.WriteSecured2 => MxCommand.PayloadOneofCase.WriteSecured2,
MxCommandKind.AuthenticateUser => MxCommand.PayloadOneofCase.AuthenticateUser, MxCommandKind.AuthenticateUser => MxCommand.PayloadOneofCase.AuthenticateUser,
MxCommandKind.ArchestraUserToId => MxCommand.PayloadOneofCase.ArchestraUserToId, MxCommandKind.ArchestraUserToId => MxCommand.PayloadOneofCase.ArchestraUserToId,
MxCommandKind.AddItemBulk => MxCommand.PayloadOneofCase.AddItemBulk,
MxCommandKind.AdviseItemBulk => MxCommand.PayloadOneofCase.AdviseItemBulk,
MxCommandKind.RemoveItemBulk => MxCommand.PayloadOneofCase.RemoveItemBulk,
MxCommandKind.UnAdviseItemBulk => MxCommand.PayloadOneofCase.UnAdviseItemBulk,
MxCommandKind.SubscribeBulk => MxCommand.PayloadOneofCase.SubscribeBulk,
MxCommandKind.UnsubscribeBulk => MxCommand.PayloadOneofCase.UnsubscribeBulk,
MxCommandKind.Ping => MxCommand.PayloadOneofCase.Ping, MxCommandKind.Ping => MxCommand.PayloadOneofCase.Ping,
MxCommandKind.GetSessionState => MxCommand.PayloadOneofCase.GetSessionState, MxCommandKind.GetSessionState => MxCommand.PayloadOneofCase.GetSessionState,
MxCommandKind.GetWorkerInfo => MxCommand.PayloadOneofCase.GetWorkerInfo, MxCommandKind.GetWorkerInfo => MxCommand.PayloadOneofCase.GetWorkerInfo,
@@ -247,6 +247,120 @@ public sealed class GatewaySession
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false); return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
} }
public Task<IReadOnlyList<SubscribeResult>> AddItemBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
AddItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.TagAddresses.Add(tagAddresses);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.AddItemBulk,
AddItemBulk = bulkCommand,
},
reply => reply.AddItemBulk,
cancellationToken);
}
public Task<IReadOnlyList<SubscribeResult>> AdviseItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
AdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.AdviseItemBulk,
AdviseItemBulk = bulkCommand,
},
reply => reply.AdviseItemBulk,
cancellationToken);
}
public Task<IReadOnlyList<SubscribeResult>> RemoveItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
RemoveItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.RemoveItemBulk,
RemoveItemBulk = bulkCommand,
},
reply => reply.RemoveItemBulk,
cancellationToken);
}
public Task<IReadOnlyList<SubscribeResult>> UnAdviseItemBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnAdviseItemBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.UnAdviseItemBulk,
UnAdviseItemBulk = bulkCommand,
},
reply => reply.UnAdviseItemBulk,
cancellationToken);
}
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(tagAddresses);
SubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.TagAddresses.Add(tagAddresses);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = bulkCommand,
},
reply => reply.SubscribeBulk,
cancellationToken);
}
public Task<IReadOnlyList<SubscribeResult>> UnsubscribeBulkAsync(
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(itemHandles);
UnsubscribeBulkCommand bulkCommand = new() { ServerHandle = serverHandle };
bulkCommand.ItemHandles.Add(itemHandles);
return InvokeBulkAsync(
new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = bulkCommand,
},
reply => reply.UnsubscribeBulk,
cancellationToken);
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken) public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{ {
IWorkerClient workerClient = GetReadyWorkerClient(); IWorkerClient workerClient = GetReadyWorkerClient();
@@ -308,6 +422,35 @@ public sealed class GatewaySession
} }
} }
private async Task<IReadOnlyList<SubscribeResult>> InvokeBulkAsync(
MxCommand command,
Func<MxCommandReply, BulkSubscribeReply?> payloadAccessor,
CancellationToken cancellationToken)
{
WorkerCommandReply workerReply = await InvokeAsync(
new WorkerCommand { Command = command },
cancellationToken)
.ConfigureAwait(false);
MxCommandReply reply = workerReply.Reply ?? new MxCommandReply
{
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = "Worker command reply did not contain a public reply payload.",
},
};
if (reply.ProtocolStatus?.Code is not ProtocolStatusCode.Ok)
{
string message = reply.ProtocolStatus?.Message ?? reply.DiagnosticMessage;
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
string.IsNullOrWhiteSpace(message) ? "Bulk MXAccess command failed." : message);
}
return payloadAccessor(reply)?.Results.ToArray() ?? [];
}
private IWorkerClient GetReadyWorkerClient() private IWorkerClient GetReadyWorkerClient()
{ {
lock (_syncRoot) lock (_syncRoot)
@@ -48,6 +48,50 @@ public sealed class SessionManagerTests
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
} }
[Fact]
public async Task GatewaySessionSubscribeBulkAsync_ForwardsOneBulkCommandAndReturnsResults()
{
FakeWorkerClient workerClient = new()
{
InvokeReply = new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = "session-1",
CorrelationId = "correlation-1",
Kind = MxCommandKind.SubscribeBulk,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
SubscribeBulk = new BulkSubscribeReply
{
Results =
{
new SubscribeResult
{
ServerHandle = 12,
TagAddress = "Galaxy.Tag.Value",
ItemHandle = 512,
WasSuccessful = true,
},
},
},
},
},
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
IReadOnlyList<SubscribeResult> results = await session.SubscribeBulkAsync(
12,
["Galaxy.Tag.Value"],
CancellationToken.None);
SubscribeResult result = Assert.Single(results);
Assert.Equal(512, result.ItemHandle);
Assert.Equal(1, workerClient.InvokeCount);
Assert.Equal(MxCommandKind.SubscribeBulk, workerClient.LastCommand?.Command.Kind);
Assert.Equal(["Galaxy.Tag.Value"], workerClient.LastCommand?.Command.SubscribeBulk.TagAddresses);
}
[Fact] [Fact]
public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand() public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand()
{ {
@@ -288,6 +332,10 @@ public sealed class SessionManagerTests
public Exception? ShutdownException { get; init; } public Exception? ShutdownException { get; init; }
public WorkerCommand? LastCommand { get; private set; }
public WorkerCommandReply? InvokeReply { get; init; }
public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
{ {
return Task.CompletedTask; return Task.CompletedTask;
@@ -299,6 +347,12 @@ public sealed class SessionManagerTests
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
InvokeCount++; InvokeCount++;
LastCommand = command;
if (InvokeReply is not null)
{
return Task.FromResult(InvokeReply);
}
MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified; MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified;
return Task.FromResult(new WorkerCommandReply return Task.FromResult(new WorkerCommandReply
@@ -416,6 +416,74 @@ public sealed class MxAccessCommandExecutorTests
Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind); Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind);
} }
[Fact]
public async Task DispatchAsync_SubscribeBulk_RunsSequentialMxAccessCallsAndReturnsPerItemResults()
{
FakeMxAccessComObject fakeComObject = new(
registerHandle: 60,
addItemHandle: 512);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
MxCommandReply reply = await session.DispatchAsync(CreateSubscribeBulkCommand(
"subscribe-bulk",
60,
["", "Galaxy.Tag.Value"]));
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Equal(MxCommandKind.SubscribeBulk, reply.Kind);
Assert.Collection(
reply.SubscribeBulk.Results,
result =>
{
Assert.False(result.WasSuccessful);
Assert.Equal(string.Empty, result.TagAddress);
Assert.Equal(0, result.ItemHandle);
Assert.Contains("required", result.ErrorMessage, StringComparison.OrdinalIgnoreCase);
},
result =>
{
Assert.True(result.WasSuccessful);
Assert.Equal("Galaxy.Tag.Value", result.TagAddress);
Assert.Equal(512, result.ItemHandle);
});
Assert.Equal(
["AddItem:60:Galaxy.Tag.Value", "Advise:60:512"],
fakeComObject.OperationNames);
Assert.Equal(runtime.StaThreadId, fakeComObject.AddItemThreadId);
Assert.Equal(runtime.StaThreadId, fakeComObject.AdviseThreadId);
}
[Fact]
public async Task DispatchAsync_UnsubscribeBulk_RemovesItemAfterUnAdviseFailure()
{
const int hresult = unchecked((int)0x80070057);
FakeMxAccessComObject fakeComObject = new(
registerHandle: 61,
unAdviseException: new COMException("Invalid item handle.", hresult));
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
MxCommandReply reply = await session.DispatchAsync(CreateUnsubscribeBulkCommand(
"unsubscribe-bulk",
61,
[513]));
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
SubscribeResult result = Assert.Single(reply.UnsubscribeBulk.Results);
Assert.False(result.WasSuccessful);
Assert.Equal(513, result.ItemHandle);
Assert.Equal(string.Empty, result.TagAddress);
Assert.Contains("UnAdvise failed", result.ErrorMessage);
Assert.Equal(
["UnAdvise:61:513", "RemoveItem:61:513"],
fakeComObject.OperationNames);
}
[Fact] [Fact]
public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder() public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder()
{ {
@@ -658,6 +726,48 @@ public sealed class MxAccessCommandExecutorTests
}); });
} }
private static StaCommand CreateSubscribeBulkCommand(
string correlationId,
int serverHandle,
IEnumerable<string> tagAddresses)
{
SubscribeBulkCommand command = new()
{
ServerHandle = serverHandle,
};
command.TagAddresses.Add(tagAddresses);
return new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.SubscribeBulk,
SubscribeBulk = command,
});
}
private static StaCommand CreateUnsubscribeBulkCommand(
string correlationId,
int serverHandle,
IEnumerable<int> itemHandles)
{
UnsubscribeBulkCommand command = new()
{
ServerHandle = serverHandle,
};
command.ItemHandles.Add(itemHandles);
return new StaCommand(
"session-1",
correlationId,
new MxCommand
{
Kind = MxCommandKind.UnsubscribeBulk,
UnsubscribeBulk = command,
});
}
private static StaCommand CreateAdviseSupervisoryCommand( private static StaCommand CreateAdviseSupervisoryCommand(
string correlationId, string correlationId,
int serverHandle, int serverHandle,
@@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto; using MxGateway.Contracts.Proto;
using MxGateway.Worker.Conversion; using MxGateway.Worker.Conversion;
using MxGateway.Worker.Sta; using MxGateway.Worker.Sta;
@@ -40,6 +41,12 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
MxCommandKind.Advise => ExecuteAdvise(command), MxCommandKind.Advise => ExecuteAdvise(command),
MxCommandKind.UnAdvise => ExecuteUnAdvise(command), MxCommandKind.UnAdvise => ExecuteUnAdvise(command),
MxCommandKind.AdviseSupervisory => ExecuteAdviseSupervisory(command), MxCommandKind.AdviseSupervisory => ExecuteAdviseSupervisory(command),
MxCommandKind.AddItemBulk => ExecuteAddItemBulk(command),
MxCommandKind.AdviseItemBulk => ExecuteAdviseItemBulk(command),
MxCommandKind.RemoveItemBulk => ExecuteRemoveItemBulk(command),
MxCommandKind.UnAdviseItemBulk => ExecuteUnAdviseItemBulk(command),
MxCommandKind.SubscribeBulk => ExecuteSubscribeBulk(command),
MxCommandKind.UnsubscribeBulk => ExecuteUnsubscribeBulk(command),
_ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."), _ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."),
}; };
} }
@@ -178,6 +185,84 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
return CreateOkReply(command); return CreateOkReply(command);
} }
private MxCommandReply ExecuteAddItemBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.AddItemBulk)
{
return CreateInvalidRequestReply(command, "AddItemBulk command payload is required.");
}
AddItemBulkCommand addItemBulkCommand = command.Command.AddItemBulk;
return CreateBulkReply(
command,
session.AddItemBulk(addItemBulkCommand.ServerHandle, addItemBulkCommand.TagAddresses));
}
private MxCommandReply ExecuteAdviseItemBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.AdviseItemBulk)
{
return CreateInvalidRequestReply(command, "AdviseItemBulk command payload is required.");
}
AdviseItemBulkCommand adviseItemBulkCommand = command.Command.AdviseItemBulk;
return CreateBulkReply(
command,
session.AdviseItemBulk(adviseItemBulkCommand.ServerHandle, adviseItemBulkCommand.ItemHandles));
}
private MxCommandReply ExecuteRemoveItemBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.RemoveItemBulk)
{
return CreateInvalidRequestReply(command, "RemoveItemBulk command payload is required.");
}
RemoveItemBulkCommand removeItemBulkCommand = command.Command.RemoveItemBulk;
return CreateBulkReply(
command,
session.RemoveItemBulk(removeItemBulkCommand.ServerHandle, removeItemBulkCommand.ItemHandles));
}
private MxCommandReply ExecuteUnAdviseItemBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.UnAdviseItemBulk)
{
return CreateInvalidRequestReply(command, "UnAdviseItemBulk command payload is required.");
}
UnAdviseItemBulkCommand unAdviseItemBulkCommand = command.Command.UnAdviseItemBulk;
return CreateBulkReply(
command,
session.UnAdviseItemBulk(unAdviseItemBulkCommand.ServerHandle, unAdviseItemBulkCommand.ItemHandles));
}
private MxCommandReply ExecuteSubscribeBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.SubscribeBulk)
{
return CreateInvalidRequestReply(command, "SubscribeBulk command payload is required.");
}
SubscribeBulkCommand subscribeBulkCommand = command.Command.SubscribeBulk;
return CreateBulkReply(
command,
session.SubscribeBulk(subscribeBulkCommand.ServerHandle, subscribeBulkCommand.TagAddresses));
}
private MxCommandReply ExecuteUnsubscribeBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.UnsubscribeBulk)
{
return CreateInvalidRequestReply(command, "UnsubscribeBulk command payload is required.");
}
UnsubscribeBulkCommand unsubscribeBulkCommand = command.Command.UnsubscribeBulk;
return CreateBulkReply(
command,
session.UnsubscribeBulk(unsubscribeBulkCommand.ServerHandle, unsubscribeBulkCommand.ItemHandles));
}
private static MxCommandReply CreateOkReply(StaCommand command) private static MxCommandReply CreateOkReply(StaCommand command)
{ {
return new MxCommandReply return new MxCommandReply
@@ -194,6 +279,41 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
}; };
} }
private static MxCommandReply CreateBulkReply(
StaCommand command,
IEnumerable<SubscribeResult> results)
{
MxCommandReply reply = CreateOkReply(command);
BulkSubscribeReply bulkReply = new();
bulkReply.Results.Add(results);
switch (command.Kind)
{
case MxCommandKind.AddItemBulk:
reply.AddItemBulk = bulkReply;
break;
case MxCommandKind.AdviseItemBulk:
reply.AdviseItemBulk = bulkReply;
break;
case MxCommandKind.RemoveItemBulk:
reply.RemoveItemBulk = bulkReply;
break;
case MxCommandKind.UnAdviseItemBulk:
reply.UnAdviseItemBulk = bulkReply;
break;
case MxCommandKind.SubscribeBulk:
reply.SubscribeBulk = bulkReply;
break;
case MxCommandKind.UnsubscribeBulk:
reply.UnsubscribeBulk = bulkReply;
break;
default:
throw new InvalidOperationException($"Unsupported bulk command kind {command.Kind}.");
}
return reply;
}
private static MxCommandReply CreateInvalidRequestReply( private static MxCommandReply CreateInvalidRequestReply(
StaCommand command, StaCommand command,
string message) string message)
@@ -189,6 +189,202 @@ public sealed class MxAccessSession : IDisposable
MxAccessAdviceKind.Supervisory); MxAccessAdviceKind.Supervisory);
} }
public IReadOnlyList<SubscribeResult> AddItemBulk(
int serverHandle,
IEnumerable<string> tagAddresses)
{
ThrowIfDisposed();
if (tagAddresses is null)
{
throw new ArgumentNullException(nameof(tagAddresses));
}
List<SubscribeResult> results = new();
foreach (string? tagAddress in tagAddresses)
{
if (string.IsNullOrWhiteSpace(tagAddress))
{
results.Add(Failed(serverHandle, tagAddress ?? string.Empty, itemHandle: 0, "Tag address is required."));
continue;
}
try
{
int itemHandle = AddItem(serverHandle, tagAddress);
results.Add(Succeeded(serverHandle, tagAddress, itemHandle));
}
catch (Exception exception)
{
results.Add(Failed(serverHandle, tagAddress, itemHandle: 0, exception.Message));
}
}
return results;
}
public IReadOnlyList<SubscribeResult> AdviseItemBulk(
int serverHandle,
IEnumerable<int> itemHandles)
{
ThrowIfDisposed();
if (itemHandles is null)
{
throw new ArgumentNullException(nameof(itemHandles));
}
List<SubscribeResult> results = new();
foreach (int itemHandle in itemHandles)
{
try
{
Advise(serverHandle, itemHandle);
results.Add(Succeeded(serverHandle, string.Empty, itemHandle));
}
catch (Exception exception)
{
results.Add(Failed(serverHandle, string.Empty, itemHandle, exception.Message));
}
}
return results;
}
public IReadOnlyList<SubscribeResult> RemoveItemBulk(
int serverHandle,
IEnumerable<int> itemHandles)
{
ThrowIfDisposed();
if (itemHandles is null)
{
throw new ArgumentNullException(nameof(itemHandles));
}
List<SubscribeResult> results = new();
foreach (int itemHandle in itemHandles)
{
try
{
RemoveItem(serverHandle, itemHandle);
results.Add(Succeeded(serverHandle, string.Empty, itemHandle));
}
catch (Exception exception)
{
results.Add(Failed(serverHandle, string.Empty, itemHandle, exception.Message));
}
}
return results;
}
public IReadOnlyList<SubscribeResult> UnAdviseItemBulk(
int serverHandle,
IEnumerable<int> itemHandles)
{
ThrowIfDisposed();
if (itemHandles is null)
{
throw new ArgumentNullException(nameof(itemHandles));
}
List<SubscribeResult> results = new();
foreach (int itemHandle in itemHandles)
{
try
{
UnAdvise(serverHandle, itemHandle);
results.Add(Succeeded(serverHandle, string.Empty, itemHandle));
}
catch (Exception exception)
{
results.Add(Failed(serverHandle, string.Empty, itemHandle, exception.Message));
}
}
return results;
}
public IReadOnlyList<SubscribeResult> SubscribeBulk(
int serverHandle,
IEnumerable<string> tagAddresses)
{
ThrowIfDisposed();
if (tagAddresses is null)
{
throw new ArgumentNullException(nameof(tagAddresses));
}
List<SubscribeResult> results = new();
foreach (string? tagAddress in tagAddresses)
{
if (string.IsNullOrWhiteSpace(tagAddress))
{
results.Add(Failed(serverHandle, tagAddress ?? string.Empty, itemHandle: 0, "Tag address is required."));
continue;
}
int itemHandle = 0;
try
{
itemHandle = AddItem(serverHandle, tagAddress);
Advise(serverHandle, itemHandle);
results.Add(Succeeded(serverHandle, tagAddress, itemHandle));
}
catch (Exception exception)
{
string errorMessage = exception.Message;
if (itemHandle != 0)
{
errorMessage = AppendRemoveItemCleanup(serverHandle, itemHandle, errorMessage);
}
results.Add(Failed(serverHandle, tagAddress, itemHandle, errorMessage));
}
}
return results;
}
public IReadOnlyList<SubscribeResult> UnsubscribeBulk(
int serverHandle,
IEnumerable<int> itemHandles)
{
ThrowIfDisposed();
if (itemHandles is null)
{
throw new ArgumentNullException(nameof(itemHandles));
}
List<SubscribeResult> results = new();
foreach (int itemHandle in itemHandles)
{
List<string> errors = new();
try
{
UnAdvise(serverHandle, itemHandle);
}
catch (Exception exception)
{
errors.Add($"UnAdvise failed: {exception.Message}");
}
try
{
RemoveItem(serverHandle, itemHandle);
}
catch (Exception exception)
{
errors.Add($"RemoveItem failed: {exception.Message}");
}
results.Add(errors.Count == 0
? Succeeded(serverHandle, string.Empty, itemHandle)
: Failed(serverHandle, string.Empty, itemHandle, string.Join("; ", errors)));
}
return results;
}
public MxAccessShutdownResult ShutdownGracefully() public MxAccessShutdownResult ShutdownGracefully()
{ {
if (disposed) if (disposed)
@@ -290,6 +486,53 @@ public sealed class MxAccessSession : IDisposable
return ((long)serverHandle << 32) | (uint)itemHandle; return ((long)serverHandle << 32) | (uint)itemHandle;
} }
private string AppendRemoveItemCleanup(
int serverHandle,
int itemHandle,
string errorMessage)
{
try
{
RemoveItem(serverHandle, itemHandle);
return $"{errorMessage}; cleanup RemoveItem succeeded.";
}
catch (Exception cleanupException)
{
return $"{errorMessage}; cleanup RemoveItem failed: {cleanupException.Message}";
}
}
private static SubscribeResult Succeeded(
int serverHandle,
string tagAddress,
int itemHandle)
{
return new SubscribeResult
{
ServerHandle = serverHandle,
TagAddress = tagAddress,
ItemHandle = itemHandle,
WasSuccessful = true,
ErrorMessage = string.Empty,
};
}
private static SubscribeResult Failed(
int serverHandle,
string tagAddress,
int itemHandle,
string errorMessage)
{
return new SubscribeResult
{
ServerHandle = serverHandle,
TagAddress = tagAddress,
ItemHandle = itemHandle,
WasSuccessful = false,
ErrorMessage = errorMessage,
};
}
private void DisposeCore(ICollection<MxAccessShutdownFailure>? failures) private void DisposeCore(ICollection<MxAccessShutdownFailure>? failures)
{ {
try try