using System.Runtime.CompilerServices; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy; using ZB.MOM.WW.MxGateway.Server.Galaxy; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; /// /// Integration coverage for the single outbound choke point /// (): /// array AddItem addresses gain the writable [] suffix and sparse array writes are /// expanded to whole-array values before any command reaches the worker. /// public sealed class GatewayArrayWriteWiringTests { /// /// A bare array AddItem address is normalized to its writable array form on the wire, /// and the normalized address lands in the tracked . /// [Fact] public async Task AddItem_BareArrayAddress_NormalizedOnWireAndInRegistration() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = 1, ItemDefinition = "Obj.Arr", }, }, }; worker.NextReply = new WorkerCommandReply { Reply = new MxCommandReply { Kind = MxCommandKind.AddItem, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, AddItem = new AddItemReply { ItemHandle = 42 }, }, }; await session.InvokeAsync(command, CancellationToken.None); Assert.NotNull(worker.LastCommand); Assert.Equal("Obj.Arr[]", worker.LastCommand!.Command.AddItem.ItemDefinition); // Track the reply through the same path the gRPC service uses; the registration must carry // the normalized address even though the public reply is tracked from a separate command copy. MxCommand trackingCopy = new() { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = 1, ItemDefinition = "Obj.Arr", }, }; session.TrackCommandReply(trackingCopy, worker.NextReply.Reply); Assert.True(session.TryGetItemRegistration(1, 42, out SessionItemRegistration registration)); Assert.Equal("Obj.Arr[]", registration.TagAddress); } /// A bare scalar AddItem address is forwarded unchanged. [Fact] public async Task AddItem_ScalarAddress_ForwardedUnchanged() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = 1, ItemDefinition = "Obj.Scalar", }, }, }; await session.InvokeAsync(command, CancellationToken.None); Assert.Equal("Obj.Scalar", worker.LastCommand!.Command.AddItem.ItemDefinition); } /// /// A sparse-array value is expanded to a full, default-filled /// before reaching the worker; no sparse value is ever forwarded. /// [Fact] public async Task Write_SparseArrayValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.Write, Write = new WriteCommand { ServerHandle = 1, ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 1, Value = new MxValue { Int32Value = 7 }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.Write.Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 7, 0, 0 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A bare array AddItem2 address is normalized to its writable array form on the wire, /// and the normalized address lands in the tracked . /// [Fact] public async Task AddItem2_BareArrayAddress_NormalizedOnWireAndInRegistration() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.AddItem2, AddItem2 = new AddItem2Command { ServerHandle = 1, ItemDefinition = "Obj.Arr", }, }, }; worker.NextReply = new WorkerCommandReply { Reply = new MxCommandReply { Kind = MxCommandKind.AddItem2, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, AddItem2 = new AddItem2Reply { ItemHandle = 43 }, }, }; await session.InvokeAsync(command, CancellationToken.None); Assert.NotNull(worker.LastCommand); Assert.Equal("Obj.Arr[]", worker.LastCommand!.Command.AddItem2.ItemDefinition); MxCommand trackingCopy = new() { Kind = MxCommandKind.AddItem2, AddItem2 = new AddItem2Command { ServerHandle = 1, ItemDefinition = "Obj.Arr", }, }; session.TrackCommandReply(trackingCopy, worker.NextReply.Reply); Assert.True(session.TryGetItemRegistration(1, 43, out SessionItemRegistration registration)); Assert.Equal("Obj.Arr[]", registration.TagAddress); } /// /// A sparse-array entry in a is expanded to a full, /// default-filled before reaching the worker; no sparse value is ever /// forwarded inside a bulk write. /// [Fact] public async Task WriteBulk_SparseArrayEntryValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.WriteBulk, WriteBulk = new WriteBulkCommand { ServerHandle = 1, Entries = { new WriteBulkEntry { ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 1, Value = new MxValue { Int32Value = 7 }, }, }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.WriteBulk.Entries[0].Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 7, 0, 0 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A bare array address added via AddItemBulk is normalized to its writable array form /// on the wire (so the worker registers a write-capable handle), while a scalar address in the /// same batch is forwarded unchanged. Tracking the worker's echoed reply lands the normalized /// address in the . /// [Fact] public async Task AddItemBulk_BareArrayAddress_NormalizedOnWireAndInRegistration() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.AddItemBulk, AddItemBulk = new AddItemBulkCommand { ServerHandle = 1, TagAddresses = { "Obj.Arr", "Obj.Scalar" }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); // The array address gains the writable "[]" suffix; the scalar passes through unchanged. Assert.NotNull(worker.LastCommand); Assert.Equal( new[] { "Obj.Arr[]", "Obj.Scalar" }, worker.LastCommand!.Command.AddItemBulk.TagAddresses); // The real worker echoes back the (suffixed) address it bound; tracking the reply must land the // normalized address in the registration so a later write resolves the write-capable handle. MxCommand trackingCopy = new() { Kind = MxCommandKind.AddItemBulk, AddItemBulk = new AddItemBulkCommand { ServerHandle = 1, TagAddresses = { "Obj.Arr", "Obj.Scalar" }, }, }; MxCommandReply reply = new() { Kind = MxCommandKind.AddItemBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, AddItemBulk = new BulkSubscribeReply { Results = { new SubscribeResult { ServerHandle = 1, ItemHandle = 50, TagAddress = "Obj.Arr[]", WasSuccessful = true, }, new SubscribeResult { ServerHandle = 1, ItemHandle = 51, TagAddress = "Obj.Scalar", WasSuccessful = true, }, }, }, }; session.TrackCommandReply(trackingCopy, reply); Assert.True(session.TryGetItemRegistration(1, 50, out SessionItemRegistration arrayRegistration)); Assert.Equal("Obj.Arr[]", arrayRegistration.TagAddress); Assert.True(session.TryGetItemRegistration(1, 51, out SessionItemRegistration scalarRegistration)); Assert.Equal("Obj.Scalar", scalarRegistration.TagAddress); } /// /// A sparse-array value is expanded to a full, /// default-filled before reaching the worker; the secured variant's /// case arm in NormalizeOutboundCommand is wired to the expander. /// [Fact] public async Task WriteSecured_SparseArrayValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.WriteSecured, WriteSecured = new WriteSecuredCommand { ServerHandle = 1, ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 2, Value = new MxValue { Int32Value = 9 }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.WriteSecured.Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 0, 9, 0 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A sparse-array (timestamped) value is expanded to a full, /// default-filled before reaching the worker; the Write2 variant's /// case arm in NormalizeOutboundCommand is wired to the expander. /// [Fact] public async Task Write2_SparseArrayValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.Write2, Write2 = new Write2Command { ServerHandle = 1, ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 3, Value = new MxValue { Int32Value = 5 }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.Write2.Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 0, 0, 5 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A sparse-array (timestamped secured) value is expanded /// to a full, default-filled before reaching the worker; the /// WriteSecured2 variant's case arm in NormalizeOutboundCommand is wired to /// the expander. /// [Fact] public async Task WriteSecured2_SparseArrayValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.WriteSecured2, WriteSecured2 = new WriteSecured2Command { ServerHandle = 1, ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 0, Value = new MxValue { Int32Value = 3 }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.WriteSecured2.Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 3, 0, 0, 0 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A sparse-array entry in a (timestamped) is expanded to a /// full, default-filled before reaching the worker; the Write2Bulk /// variant's case arm in NormalizeOutboundCommand is wired to the expander. /// [Fact] public async Task Write2Bulk_SparseArrayEntryValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.Write2Bulk, Write2Bulk = new Write2BulkCommand { ServerHandle = 1, Entries = { new Write2BulkEntry { ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 1, Value = new MxValue { Int32Value = 11 }, }, }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.Write2Bulk.Entries[0].Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 11, 0, 0 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A sparse-array entry in a is expanded to a full, /// default-filled before reaching the worker; the WriteSecuredBulk /// variant's case arm in NormalizeOutboundCommand is wired to the expander. /// [Fact] public async Task WriteSecuredBulk_SparseArrayEntryValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.WriteSecuredBulk, WriteSecuredBulk = new WriteSecuredBulkCommand { ServerHandle = 1, Entries = { new WriteSecuredBulkEntry { ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 2, Value = new MxValue { Int32Value = 13 }, }, }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.WriteSecuredBulk.Entries[0].Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 0, 13, 0 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A sparse-array entry in a (timestamped secured) is /// expanded to a full, default-filled before reaching the worker; the /// WriteSecured2Bulk variant's case arm in NormalizeOutboundCommand is wired to /// the expander. /// [Fact] public async Task WriteSecured2Bulk_SparseArrayEntryValue_ExpandedBeforeReachingWorker() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.WriteSecured2Bulk, WriteSecured2Bulk = new WriteSecured2BulkCommand { ServerHandle = 1, Entries = { new WriteSecured2BulkEntry { ItemHandle = 42, Value = new MxValue { SparseArrayValue = new MxSparseArray { ElementDataType = MxDataType.Integer, TotalLength = 4, Elements = { new MxSparseElement { Index = 3, Value = new MxValue { Int32Value = 17 }, }, }, }, }, }, }, }, }, }; await session.InvokeAsync(command, CancellationToken.None); MxValue forwarded = worker.LastCommand!.Command.WriteSecured2Bulk.Entries[0].Value; Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); Assert.Equal(new[] { 0, 0, 0, 17 }, forwarded.ArrayValue.Int32Values.Values); } /// /// A bare array address added via AddBufferedItem is normalized to its writable array /// form on the wire and in the tracked . /// [Fact] public async Task AddBufferedItem_BareArrayAddress_NormalizedOnWireAndInRegistration() { CapturingWorkerClient worker = new(); GatewaySession session = CreateReadySession(worker); WorkerCommand command = new() { Command = new MxCommand { Kind = MxCommandKind.AddBufferedItem, AddBufferedItem = new AddBufferedItemCommand { ServerHandle = 1, ItemDefinition = "Obj.Arr", }, }, }; worker.NextReply = new WorkerCommandReply { Reply = new MxCommandReply { Kind = MxCommandKind.AddBufferedItem, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, AddBufferedItem = new AddBufferedItemReply { ItemHandle = 60 }, }, }; await session.InvokeAsync(command, CancellationToken.None); Assert.NotNull(worker.LastCommand); Assert.Equal("Obj.Arr[]", worker.LastCommand!.Command.AddBufferedItem.ItemDefinition); // AddBufferedItem tracking keys off the command's ItemDefinition (the reply carries no address), // so the tracking-path normalization must run for the registration to match the bound handle. MxCommand trackingCopy = new() { Kind = MxCommandKind.AddBufferedItem, AddBufferedItem = new AddBufferedItemCommand { ServerHandle = 1, ItemDefinition = "Obj.Arr", }, }; session.TrackCommandReply(trackingCopy, worker.NextReply.Reply); Assert.True(session.TryGetItemRegistration(1, 60, out SessionItemRegistration registration)); Assert.Equal("Obj.Arr[]", registration.TagAddress); } private static GatewaySession CreateReadySession(IWorkerClient workerClient) { GatewaySession session = new( sessionId: "session-array-write-wiring", backendName: "mxaccess", pipeName: "mxaccess-gateway-1-session-array-write-wiring", nonce: "nonce", clientIdentity: "client-1", ownerKeyId: null, clientSessionName: "test-session", clientCorrelationId: "client-correlation-1", commandTimeout: TimeSpan.FromSeconds(5), startupTimeout: TimeSpan.FromSeconds(5), shutdownTimeout: TimeSpan.FromSeconds(5), leaseDuration: TimeSpan.FromMinutes(30), openedAt: DateTimeOffset.UtcNow, addressNormalizer: CreateNormalizer()); session.AttachWorkerClient(workerClient); session.MarkReady(); return session; } private static ArrayAddressNormalizer CreateNormalizer() { IReadOnlyList objects = [ new GalaxyObject { GobjectId = 1, TagName = "Obj", ContainedName = "Obj", Attributes = { new GalaxyAttribute { AttributeName = "Arr", FullTagReference = "Obj.Arr[]", IsArray = true, }, new GalaxyAttribute { AttributeName = "Scalar", FullTagReference = "Obj.Scalar", IsArray = false, }, }, }, ]; GalaxyHierarchyCacheEntry entry = GalaxyHierarchyCacheEntry.Empty with { Status = GalaxyCacheStatus.Healthy, Objects = objects, Index = GalaxyHierarchyIndex.Build(objects), }; return new ArrayAddressNormalizer(new StubGalaxyHierarchyCache(entry)); } private sealed class StubGalaxyHierarchyCache(GalaxyHierarchyCacheEntry current) : IGalaxyHierarchyCache { /// Gets the current cache entry. public GalaxyHierarchyCacheEntry Current { get; } = current; /// public Task RefreshAsync(CancellationToken cancellationToken) => Task.CompletedTask; /// public Task WaitForFirstLoadAsync(CancellationToken cancellationToken) => Task.CompletedTask; } private sealed class CapturingWorkerClient : IWorkerClient { /// Gets the most recent command forwarded to the worker. public WorkerCommand? LastCommand { get; private set; } /// Gets or sets the reply returned by the next invocation. public WorkerCommandReply NextReply { get; set; } = new(); /// Gets the session identifier. public string SessionId { get; } = "session-array-write-wiring"; /// Gets the worker process identifier. public int? ProcessId { get; } = 1234; /// Gets the worker client state. public WorkerClientState State { get; } = WorkerClientState.Ready; /// Gets the last recorded heartbeat timestamp. public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; /// public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; /// public Task InvokeAsync( WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) { LastCommand = command; return Task.FromResult(NextReply); } /// public async IAsyncEnumerable ReadEventsAsync( [EnumeratorCancellation] CancellationToken cancellationToken) { await Task.CompletedTask.ConfigureAwait(false); yield break; } /// public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask; /// public void Kill(string reason) { } /// public ValueTask DisposeAsync() => ValueTask.CompletedTask; } }