From f0ef7ea0a85e02a933d1f5317a947211c7ded884 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 18 Jun 2026 03:10:13 -0400 Subject: [PATCH] feat(gateway): normalize array AddItem suffix and expand sparse writes at the worker boundary --- .../Sessions/GatewaySession.cs | 103 ++++++- .../Sessions/SessionManager.cs | 13 +- .../SessionServiceCollectionExtensions.cs | 3 + .../Sessions/GatewayArrayWriteWiringTests.cs | 267 ++++++++++++++++++ 4 files changed, 381 insertions(+), 5 deletions(-) create mode 100644 src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewayArrayWriteWiringTests.cs diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index a5bf1c6..67127e7 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -38,6 +38,7 @@ public sealed class GatewaySession private Task? _dashboardMirrorTask; private CancellationTokenSource? _dashboardMirrorCts; private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = []; + private readonly ArrayAddressNormalizer? _addressNormalizer; /// /// Initializes a gateway session with session metadata and timeout configuration. @@ -133,6 +134,12 @@ public sealed class GatewaySession /// fast immediately. (the default) disables the wait and /// preserves the original fail-fast behavior byte-for-byte. /// + /// + /// Rewrites bare array AddItem/AddItem2 addresses to their writable [] + /// form using Galaxy metadata at the outbound choke point (and on registration tracking). + /// When (legacy unit-construction paths that do not exercise Galaxy + /// metadata), addresses pass through unchanged. + /// public GatewaySession( string sessionId, string backendName, @@ -149,7 +156,8 @@ public sealed class GatewaySession DateTimeOffset openedAt, SessionEventStreaming? eventStreaming = null, TimeSpan detachGrace = default, - TimeSpan workerReadyWaitTimeout = default) + TimeSpan workerReadyWaitTimeout = default, + ArrayAddressNormalizer? addressNormalizer = null) { if (string.IsNullOrWhiteSpace(sessionId)) { @@ -189,6 +197,7 @@ public sealed class GatewaySession _eventStreaming = eventStreaming ?? SessionEventStreaming.Default; _detachGrace = detachGrace > TimeSpan.Zero ? detachGrace : TimeSpan.Zero; _workerReadyWaitTimeout = workerReadyWaitTimeout > TimeSpan.Zero ? workerReadyWaitTimeout : TimeSpan.Zero; + _addressNormalizer = addressNormalizer; } /// @@ -948,12 +957,95 @@ public sealed class GatewaySession WorkerCommand command, CancellationToken cancellationToken) { + ArgumentNullException.ThrowIfNull(command); + if (command.Command is not null) + { + NormalizeOutboundCommand(command.Command); + } + IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false); TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow()); return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false); } + // Single outbound choke point for the two array-write ergonomics shims (Task 3): + // 1. AddItem/AddItem2 array addresses gain the writable "[]" suffix when Galaxy metadata + // reports them as arrays, so the worker registers a write-capable handle. The mutation + // lands on the same MxCommand instance forwarded to the worker. + // 2. Sparse array write values are expanded to whole-array values, because MXAccess has no + // partial-array write primitive — the worker only ever sees a full MxArray. + // SparseArrayExpander.Expand throws RpcException(InvalidArgument) for an invalid sparse payload; + // that propagates out of InvokeAsync as the desired client-facing error and is deliberately not + // caught here. + private void NormalizeOutboundCommand(MxCommand command) + { + switch (command.PayloadCase) + { + case MxCommand.PayloadOneofCase.AddItem: + command.AddItem.ItemDefinition = NormalizeAddress(command.AddItem.ItemDefinition); + break; + case MxCommand.PayloadOneofCase.AddItem2: + command.AddItem2.ItemDefinition = NormalizeAddress(command.AddItem2.ItemDefinition); + break; + case MxCommand.PayloadOneofCase.Write: + ExpandValue(command.Write.Value); + break; + case MxCommand.PayloadOneofCase.WriteSecured: + ExpandValue(command.WriteSecured.Value); + break; + case MxCommand.PayloadOneofCase.Write2: + ExpandValue(command.Write2.Value); + break; + case MxCommand.PayloadOneofCase.WriteSecured2: + ExpandValue(command.WriteSecured2.Value); + break; + case MxCommand.PayloadOneofCase.WriteBulk: + foreach (WriteBulkEntry entry in command.WriteBulk.Entries) + { + ExpandValue(entry.Value); + } + + break; + case MxCommand.PayloadOneofCase.Write2Bulk: + foreach (Write2BulkEntry entry in command.Write2Bulk.Entries) + { + ExpandValue(entry.Value); + } + + break; + case MxCommand.PayloadOneofCase.WriteSecuredBulk: + foreach (WriteSecuredBulkEntry entry in command.WriteSecuredBulk.Entries) + { + ExpandValue(entry.Value); + } + + break; + case MxCommand.PayloadOneofCase.WriteSecured2Bulk: + foreach (WriteSecured2BulkEntry entry in command.WriteSecured2Bulk.Entries) + { + ExpandValue(entry.Value); + } + + break; + } + } + + // Best-effort array-suffix rewrite; the normalizer is null in legacy unit-construction paths + // that do not exercise Galaxy metadata, in which case the address passes through unchanged. + private string NormalizeAddress(string address) => + _addressNormalizer?.Normalize(address) ?? address; + + // MXAccess writes replace the whole array; expand a sparse value in place so the worker only + // ever receives a whole-array MxValue. No-op for null or non-sparse values. + private static void ExpandValue(MxValue? value) + { + if (value is not null) + { + SparseArrayExpander.Expand(value); + } + } + /// Gets the item registration for a server and item handle pair. /// The MXAccess server handle. /// The MXAccess item handle. @@ -985,11 +1077,16 @@ public sealed class GatewaySession { switch (command.Kind) { + // The public reply is tracked from the pre-mapping MxCommand instance, which is a + // separate copy from the one mutated at the InvokeAsync choke point (the gRPC mapper + // deep-clones before forwarding). Re-apply the array-suffix normalization here so the + // registration's TagAddress matches the address the worker actually registered. + // Normalize is idempotent for an already-suffixed address. case MxCommandKind.AddItem when reply.AddItem is not null: - TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, command.AddItem.ItemDefinition); + TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, NormalizeAddress(command.AddItem.ItemDefinition)); break; case MxCommandKind.AddItem2 when reply.AddItem2 is not null: - TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, command.AddItem2.ItemDefinition); + TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, NormalizeAddress(command.AddItem2.ItemDefinition)); break; case MxCommandKind.AddBufferedItem when reply.AddBufferedItem is not null: TrackItem(command.AddBufferedItem.ServerHandle, reply.AddBufferedItem.ItemHandle, command.AddBufferedItem.ItemDefinition); diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs index 9982794..f9d6432 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs @@ -29,6 +29,7 @@ public sealed class SessionManager : ISessionManager private readonly Grpc.MxAccessGrpcMapper _eventMapper; private readonly ILogger _distributorLogger; private readonly Dashboard.Hubs.IDashboardEventBroadcaster? _dashboardEventBroadcaster; + private readonly ArrayAddressNormalizer? _addressNormalizer; /// /// Initializes a new instance of . @@ -47,6 +48,11 @@ public sealed class SessionManager : ISessionManager /// dashboard receives events regardless of whether a gRPC client is streaming. Null in /// unit tests that do not exercise the dashboard mirror. /// + /// + /// Rewrites bare array AddItem addresses to their writable [] form using Galaxy + /// metadata; handed to each session so the normalization runs at the outbound choke point. + /// Null in unit tests that do not exercise array-write ergonomics. + /// public SessionManager( ISessionRegistry registry, ISessionWorkerClientFactory workerClientFactory, @@ -56,7 +62,8 @@ public sealed class SessionManager : ISessionManager ILogger? logger = null, Grpc.MxAccessGrpcMapper? eventMapper = null, ILogger? distributorLogger = null, - Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null) + Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null, + ArrayAddressNormalizer? addressNormalizer = null) { _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory)); @@ -67,6 +74,7 @@ public sealed class SessionManager : ISessionManager _eventMapper = eventMapper ?? new Grpc.MxAccessGrpcMapper(); _distributorLogger = distributorLogger ?? NullLogger.Instance; _dashboardEventBroadcaster = dashboardEventBroadcaster; + _addressNormalizer = addressNormalizer; _options = options.Value; _sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions); } @@ -506,7 +514,8 @@ public sealed class SessionManager : ISessionManager openedAt, eventStreaming, TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)), - TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs))); + TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs)), + _addressNormalizer); } private static string CreateClientCorrelationId( diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs index decc180..3f2cb14 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs @@ -8,6 +8,9 @@ public static class SessionServiceCollectionExtensions /// The service collection for chaining. public static IServiceCollection AddGatewaySessions(this IServiceCollection services) { + // Lifetime consistent with IGalaxyHierarchyCache (singleton); the normalizer reads the + // cache's current snapshot per call, so it holds no per-session or per-request state. + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewayArrayWriteWiringTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewayArrayWriteWiringTests.cs new file mode 100644 index 0000000..204ed6e --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewayArrayWriteWiringTests.cs @@ -0,0 +1,267 @@ +using System.Runtime.CompilerServices; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy; +using ZB.MOM.WW.MxGateway.Server.Galaxy; +using ZB.MOM.WW.MxGateway.Server.Sessions; +using ZB.MOM.WW.MxGateway.Server.Workers; + +namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; + +/// +/// Integration coverage for the single outbound choke point +/// (): +/// array AddItem addresses gain the writable [] suffix and sparse array writes are +/// expanded to whole-array values before any command reaches the worker. +/// +public sealed class GatewayArrayWriteWiringTests +{ + /// + /// A bare array AddItem address is normalized to its writable array form on the wire, + /// and the normalized address lands in the tracked . + /// + [Fact] + public async Task AddItem_BareArrayAddress_NormalizedOnWireAndInRegistration() + { + CapturingWorkerClient worker = new(); + GatewaySession session = CreateReadySession(worker); + + WorkerCommand command = new() + { + Command = new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = 1, + ItemDefinition = "Obj.Arr", + }, + }, + }; + + worker.NextReply = new WorkerCommandReply + { + Reply = new MxCommandReply + { + Kind = MxCommandKind.AddItem, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + AddItem = new AddItemReply { ItemHandle = 42 }, + }, + }; + + await session.InvokeAsync(command, CancellationToken.None); + + Assert.NotNull(worker.LastCommand); + Assert.Equal("Obj.Arr[]", worker.LastCommand!.Command.AddItem.ItemDefinition); + + // Track the reply through the same path the gRPC service uses; the registration must carry + // the normalized address even though the public reply is tracked from a separate command copy. + MxCommand trackingCopy = new() + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = 1, + ItemDefinition = "Obj.Arr", + }, + }; + session.TrackCommandReply(trackingCopy, worker.NextReply.Reply); + + Assert.True(session.TryGetItemRegistration(1, 42, out SessionItemRegistration registration)); + Assert.Equal("Obj.Arr[]", registration.TagAddress); + } + + /// A bare scalar AddItem address is forwarded unchanged. + [Fact] + public async Task AddItem_ScalarAddress_ForwardedUnchanged() + { + CapturingWorkerClient worker = new(); + GatewaySession session = CreateReadySession(worker); + + WorkerCommand command = new() + { + Command = new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = 1, + ItemDefinition = "Obj.Scalar", + }, + }, + }; + + await session.InvokeAsync(command, CancellationToken.None); + + Assert.Equal("Obj.Scalar", worker.LastCommand!.Command.AddItem.ItemDefinition); + } + + /// + /// A sparse-array value is expanded to a full, default-filled + /// before reaching the worker; no sparse value is ever forwarded. + /// + [Fact] + public async Task Write_SparseArrayValue_ExpandedBeforeReachingWorker() + { + CapturingWorkerClient worker = new(); + GatewaySession session = CreateReadySession(worker); + + WorkerCommand command = new() + { + Command = new MxCommand + { + Kind = MxCommandKind.Write, + Write = new WriteCommand + { + ServerHandle = 1, + ItemHandle = 42, + Value = new MxValue + { + SparseArrayValue = new MxSparseArray + { + ElementDataType = MxDataType.Integer, + TotalLength = 4, + Elements = + { + new MxSparseElement + { + Index = 1, + Value = new MxValue { Int32Value = 7 }, + }, + }, + }, + }, + }, + }, + }; + + await session.InvokeAsync(command, CancellationToken.None); + + MxValue forwarded = worker.LastCommand!.Command.Write.Value; + Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase); + Assert.Equal(new[] { 0, 7, 0, 0 }, forwarded.ArrayValue.Int32Values.Values); + } + + private static GatewaySession CreateReadySession(IWorkerClient workerClient) + { + GatewaySession session = new( + sessionId: "session-array-write-wiring", + backendName: "mxaccess", + pipeName: "mxaccess-gateway-1-session-array-write-wiring", + nonce: "nonce", + clientIdentity: "client-1", + ownerKeyId: null, + clientSessionName: "test-session", + clientCorrelationId: "client-correlation-1", + commandTimeout: TimeSpan.FromSeconds(5), + startupTimeout: TimeSpan.FromSeconds(5), + shutdownTimeout: TimeSpan.FromSeconds(5), + leaseDuration: TimeSpan.FromMinutes(30), + openedAt: DateTimeOffset.UtcNow, + addressNormalizer: CreateNormalizer()); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + return session; + } + + private static ArrayAddressNormalizer CreateNormalizer() + { + IReadOnlyList objects = + [ + new GalaxyObject + { + GobjectId = 1, + TagName = "Obj", + ContainedName = "Obj", + Attributes = + { + new GalaxyAttribute + { + AttributeName = "Arr", + FullTagReference = "Obj.Arr[]", + IsArray = true, + }, + new GalaxyAttribute + { + AttributeName = "Scalar", + FullTagReference = "Obj.Scalar", + IsArray = false, + }, + }, + }, + ]; + + GalaxyHierarchyCacheEntry entry = GalaxyHierarchyCacheEntry.Empty with + { + Status = GalaxyCacheStatus.Healthy, + Objects = objects, + Index = GalaxyHierarchyIndex.Build(objects), + }; + + return new ArrayAddressNormalizer(new StubGalaxyHierarchyCache(entry)); + } + + private sealed class StubGalaxyHierarchyCache(GalaxyHierarchyCacheEntry current) : IGalaxyHierarchyCache + { + /// Gets the current cache entry. + public GalaxyHierarchyCacheEntry Current { get; } = current; + + /// + public Task RefreshAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + /// + public Task WaitForFirstLoadAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } + + private sealed class CapturingWorkerClient : IWorkerClient + { + /// Gets the most recent command forwarded to the worker. + public WorkerCommand? LastCommand { get; private set; } + + /// Gets or sets the reply returned by the next invocation. + public WorkerCommandReply NextReply { get; set; } = new(); + + /// Gets the session identifier. + public string SessionId { get; } = "session-array-write-wiring"; + + /// Gets the worker process identifier. + public int? ProcessId { get; } = 1234; + + /// Gets the worker client state. + public WorkerClientState State { get; } = WorkerClientState.Ready; + + /// Gets the last recorded heartbeat timestamp. + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + /// + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + /// + public Task InvokeAsync( + WorkerCommand command, + TimeSpan timeout, + CancellationToken cancellationToken) + { + LastCommand = command; + return Task.FromResult(NextReply); + } + + /// + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask.ConfigureAwait(false); + yield break; + } + + /// + public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask; + + /// + public void Kill(string reason) + { + } + + /// + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}