feat(gateway): normalize array AddItem suffix and expand sparse writes at the worker boundary

This commit is contained in:
Joseph Doherty
2026-06-18 03:10:13 -04:00
parent 3a8f2bed4e
commit f0ef7ea0a8
4 changed files with 381 additions and 5 deletions
@@ -38,6 +38,7 @@ public sealed class GatewaySession
private Task? _dashboardMirrorTask;
private CancellationTokenSource? _dashboardMirrorCts;
private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = [];
private readonly ArrayAddressNormalizer? _addressNormalizer;
/// <summary>
/// Initializes a gateway session with session metadata and timeout configuration.
@@ -133,6 +134,12 @@ public sealed class GatewaySession
/// fast immediately. <see cref="TimeSpan.Zero"/> (the default) disables the wait and
/// preserves the original fail-fast behavior byte-for-byte.
/// </param>
/// <param name="addressNormalizer">
/// Rewrites bare array <c>AddItem</c>/<c>AddItem2</c> addresses to their writable <c>[]</c>
/// form using Galaxy metadata at the outbound choke point (and on registration tracking).
/// When <see langword="null"/> (legacy unit-construction paths that do not exercise Galaxy
/// metadata), addresses pass through unchanged.
/// </param>
public GatewaySession(
string sessionId,
string backendName,
@@ -149,7 +156,8 @@ public sealed class GatewaySession
DateTimeOffset openedAt,
SessionEventStreaming? eventStreaming = null,
TimeSpan detachGrace = default,
TimeSpan workerReadyWaitTimeout = default)
TimeSpan workerReadyWaitTimeout = default,
ArrayAddressNormalizer? addressNormalizer = null)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
@@ -189,6 +197,7 @@ public sealed class GatewaySession
_eventStreaming = eventStreaming ?? SessionEventStreaming.Default;
_detachGrace = detachGrace > TimeSpan.Zero ? detachGrace : TimeSpan.Zero;
_workerReadyWaitTimeout = workerReadyWaitTimeout > TimeSpan.Zero ? workerReadyWaitTimeout : TimeSpan.Zero;
_addressNormalizer = addressNormalizer;
}
/// <summary>
@@ -948,12 +957,95 @@ public sealed class GatewaySession
WorkerCommand command,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(command);
if (command.Command is not null)
{
NormalizeOutboundCommand(command.Command);
}
IWorkerClient workerClient = await GetReadyWorkerClientAsync(cancellationToken).ConfigureAwait(false);
TouchClientActivity(_eventStreaming.TimeProvider.GetUtcNow());
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
// Single outbound choke point for the two array-write ergonomics shims (Task 3):
// 1. AddItem/AddItem2 array addresses gain the writable "[]" suffix when Galaxy metadata
// reports them as arrays, so the worker registers a write-capable handle. The mutation
// lands on the same MxCommand instance forwarded to the worker.
// 2. Sparse array write values are expanded to whole-array values, because MXAccess has no
// partial-array write primitive — the worker only ever sees a full MxArray.
// SparseArrayExpander.Expand throws RpcException(InvalidArgument) for an invalid sparse payload;
// that propagates out of InvokeAsync as the desired client-facing error and is deliberately not
// caught here.
private void NormalizeOutboundCommand(MxCommand command)
{
switch (command.PayloadCase)
{
case MxCommand.PayloadOneofCase.AddItem:
command.AddItem.ItemDefinition = NormalizeAddress(command.AddItem.ItemDefinition);
break;
case MxCommand.PayloadOneofCase.AddItem2:
command.AddItem2.ItemDefinition = NormalizeAddress(command.AddItem2.ItemDefinition);
break;
case MxCommand.PayloadOneofCase.Write:
ExpandValue(command.Write.Value);
break;
case MxCommand.PayloadOneofCase.WriteSecured:
ExpandValue(command.WriteSecured.Value);
break;
case MxCommand.PayloadOneofCase.Write2:
ExpandValue(command.Write2.Value);
break;
case MxCommand.PayloadOneofCase.WriteSecured2:
ExpandValue(command.WriteSecured2.Value);
break;
case MxCommand.PayloadOneofCase.WriteBulk:
foreach (WriteBulkEntry entry in command.WriteBulk.Entries)
{
ExpandValue(entry.Value);
}
break;
case MxCommand.PayloadOneofCase.Write2Bulk:
foreach (Write2BulkEntry entry in command.Write2Bulk.Entries)
{
ExpandValue(entry.Value);
}
break;
case MxCommand.PayloadOneofCase.WriteSecuredBulk:
foreach (WriteSecuredBulkEntry entry in command.WriteSecuredBulk.Entries)
{
ExpandValue(entry.Value);
}
break;
case MxCommand.PayloadOneofCase.WriteSecured2Bulk:
foreach (WriteSecured2BulkEntry entry in command.WriteSecured2Bulk.Entries)
{
ExpandValue(entry.Value);
}
break;
}
}
// Best-effort array-suffix rewrite; the normalizer is null in legacy unit-construction paths
// that do not exercise Galaxy metadata, in which case the address passes through unchanged.
private string NormalizeAddress(string address) =>
_addressNormalizer?.Normalize(address) ?? address;
// MXAccess writes replace the whole array; expand a sparse value in place so the worker only
// ever receives a whole-array MxValue. No-op for null or non-sparse values.
private static void ExpandValue(MxValue? value)
{
if (value is not null)
{
SparseArrayExpander.Expand(value);
}
}
/// <summary>Gets the item registration for a server and item handle pair.</summary>
/// <param name="serverHandle">The MXAccess server handle.</param>
/// <param name="itemHandle">The MXAccess item handle.</param>
@@ -985,11 +1077,16 @@ public sealed class GatewaySession
{
switch (command.Kind)
{
// The public reply is tracked from the pre-mapping MxCommand instance, which is a
// separate copy from the one mutated at the InvokeAsync choke point (the gRPC mapper
// deep-clones before forwarding). Re-apply the array-suffix normalization here so the
// registration's TagAddress matches the address the worker actually registered.
// Normalize is idempotent for an already-suffixed address.
case MxCommandKind.AddItem when reply.AddItem is not null:
TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, command.AddItem.ItemDefinition);
TrackItem(command.AddItem.ServerHandle, reply.AddItem.ItemHandle, NormalizeAddress(command.AddItem.ItemDefinition));
break;
case MxCommandKind.AddItem2 when reply.AddItem2 is not null:
TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, command.AddItem2.ItemDefinition);
TrackItem(command.AddItem2.ServerHandle, reply.AddItem2.ItemHandle, NormalizeAddress(command.AddItem2.ItemDefinition));
break;
case MxCommandKind.AddBufferedItem when reply.AddBufferedItem is not null:
TrackItem(command.AddBufferedItem.ServerHandle, reply.AddBufferedItem.ItemHandle, command.AddBufferedItem.ItemDefinition);
@@ -29,6 +29,7 @@ public sealed class SessionManager : ISessionManager
private readonly Grpc.MxAccessGrpcMapper _eventMapper;
private readonly ILogger<SessionEventDistributor> _distributorLogger;
private readonly Dashboard.Hubs.IDashboardEventBroadcaster? _dashboardEventBroadcaster;
private readonly ArrayAddressNormalizer? _addressNormalizer;
/// <summary>
/// Initializes a new instance of <see cref="SessionManager"/>.
@@ -47,6 +48,11 @@ public sealed class SessionManager : ISessionManager
/// dashboard receives events regardless of whether a gRPC client is streaming. Null in
/// unit tests that do not exercise the dashboard mirror.
/// </param>
/// <param name="addressNormalizer">
/// Rewrites bare array AddItem addresses to their writable <c>[]</c> form using Galaxy
/// metadata; handed to each session so the normalization runs at the outbound choke point.
/// Null in unit tests that do not exercise array-write ergonomics.
/// </param>
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
@@ -56,7 +62,8 @@ public sealed class SessionManager : ISessionManager
ILogger<SessionManager>? logger = null,
Grpc.MxAccessGrpcMapper? eventMapper = null,
ILogger<SessionEventDistributor>? distributorLogger = null,
Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null,
ArrayAddressNormalizer? addressNormalizer = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
@@ -67,6 +74,7 @@ public sealed class SessionManager : ISessionManager
_eventMapper = eventMapper ?? new Grpc.MxAccessGrpcMapper();
_distributorLogger = distributorLogger ?? NullLogger<SessionEventDistributor>.Instance;
_dashboardEventBroadcaster = dashboardEventBroadcaster;
_addressNormalizer = addressNormalizer;
_options = options.Value;
_sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions);
}
@@ -506,7 +514,8 @@ public sealed class SessionManager : ISessionManager
openedAt,
eventStreaming,
TimeSpan.FromSeconds(Math.Max(0, _options.Sessions.DetachGraceSeconds)),
TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs)));
TimeSpan.FromMilliseconds(Math.Max(0, _options.Sessions.WorkerReadyWaitTimeoutMs)),
_addressNormalizer);
}
private static string CreateClientCorrelationId(
@@ -8,6 +8,9 @@ public static class SessionServiceCollectionExtensions
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
// Lifetime consistent with IGalaxyHierarchyCache (singleton); the normalizer reads the
// cache's current snapshot per call, so it holds no per-session or per-request state.
services.AddSingleton<ArrayAddressNormalizer>();
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
@@ -0,0 +1,267 @@
using System.Runtime.CompilerServices;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy;
using ZB.MOM.WW.MxGateway.Server.Galaxy;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Integration coverage for the single outbound choke point
/// (<see cref="GatewaySession.InvokeAsync(WorkerCommand, System.Threading.CancellationToken)"/>):
/// array <c>AddItem</c> addresses gain the writable <c>[]</c> suffix and sparse array writes are
/// expanded to whole-array values before any command reaches the worker.
/// </summary>
public sealed class GatewayArrayWriteWiringTests
{
/// <summary>
/// A bare array <c>AddItem</c> address is normalized to its writable array form on the wire,
/// and the normalized address lands in the tracked <see cref="SessionItemRegistration"/>.
/// </summary>
[Fact]
public async Task AddItem_BareArrayAddress_NormalizedOnWireAndInRegistration()
{
CapturingWorkerClient worker = new();
GatewaySession session = CreateReadySession(worker);
WorkerCommand command = new()
{
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = 1,
ItemDefinition = "Obj.Arr",
},
},
};
worker.NextReply = new WorkerCommandReply
{
Reply = new MxCommandReply
{
Kind = MxCommandKind.AddItem,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
AddItem = new AddItemReply { ItemHandle = 42 },
},
};
await session.InvokeAsync(command, CancellationToken.None);
Assert.NotNull(worker.LastCommand);
Assert.Equal("Obj.Arr[]", worker.LastCommand!.Command.AddItem.ItemDefinition);
// Track the reply through the same path the gRPC service uses; the registration must carry
// the normalized address even though the public reply is tracked from a separate command copy.
MxCommand trackingCopy = new()
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = 1,
ItemDefinition = "Obj.Arr",
},
};
session.TrackCommandReply(trackingCopy, worker.NextReply.Reply);
Assert.True(session.TryGetItemRegistration(1, 42, out SessionItemRegistration registration));
Assert.Equal("Obj.Arr[]", registration.TagAddress);
}
/// <summary>A bare scalar <c>AddItem</c> address is forwarded unchanged.</summary>
[Fact]
public async Task AddItem_ScalarAddress_ForwardedUnchanged()
{
CapturingWorkerClient worker = new();
GatewaySession session = CreateReadySession(worker);
WorkerCommand command = new()
{
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = 1,
ItemDefinition = "Obj.Scalar",
},
},
};
await session.InvokeAsync(command, CancellationToken.None);
Assert.Equal("Obj.Scalar", worker.LastCommand!.Command.AddItem.ItemDefinition);
}
/// <summary>
/// A sparse-array <see cref="WriteCommand"/> value is expanded to a full, default-filled
/// <see cref="MxArray"/> before reaching the worker; no sparse value is ever forwarded.
/// </summary>
[Fact]
public async Task Write_SparseArrayValue_ExpandedBeforeReachingWorker()
{
CapturingWorkerClient worker = new();
GatewaySession session = CreateReadySession(worker);
WorkerCommand command = new()
{
Command = new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = 1,
ItemHandle = 42,
Value = new MxValue
{
SparseArrayValue = new MxSparseArray
{
ElementDataType = MxDataType.Integer,
TotalLength = 4,
Elements =
{
new MxSparseElement
{
Index = 1,
Value = new MxValue { Int32Value = 7 },
},
},
},
},
},
},
};
await session.InvokeAsync(command, CancellationToken.None);
MxValue forwarded = worker.LastCommand!.Command.Write.Value;
Assert.Equal(MxValue.KindOneofCase.ArrayValue, forwarded.KindCase);
Assert.Equal(new[] { 0, 7, 0, 0 }, forwarded.ArrayValue.Int32Values.Values);
}
private static GatewaySession CreateReadySession(IWorkerClient workerClient)
{
GatewaySession session = new(
sessionId: "session-array-write-wiring",
backendName: "mxaccess",
pipeName: "mxaccess-gateway-1-session-array-write-wiring",
nonce: "nonce",
clientIdentity: "client-1",
ownerKeyId: null,
clientSessionName: "test-session",
clientCorrelationId: "client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(5),
startupTimeout: TimeSpan.FromSeconds(5),
shutdownTimeout: TimeSpan.FromSeconds(5),
leaseDuration: TimeSpan.FromMinutes(30),
openedAt: DateTimeOffset.UtcNow,
addressNormalizer: CreateNormalizer());
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
}
private static ArrayAddressNormalizer CreateNormalizer()
{
IReadOnlyList<GalaxyObject> objects =
[
new GalaxyObject
{
GobjectId = 1,
TagName = "Obj",
ContainedName = "Obj",
Attributes =
{
new GalaxyAttribute
{
AttributeName = "Arr",
FullTagReference = "Obj.Arr[]",
IsArray = true,
},
new GalaxyAttribute
{
AttributeName = "Scalar",
FullTagReference = "Obj.Scalar",
IsArray = false,
},
},
},
];
GalaxyHierarchyCacheEntry entry = GalaxyHierarchyCacheEntry.Empty with
{
Status = GalaxyCacheStatus.Healthy,
Objects = objects,
Index = GalaxyHierarchyIndex.Build(objects),
};
return new ArrayAddressNormalizer(new StubGalaxyHierarchyCache(entry));
}
private sealed class StubGalaxyHierarchyCache(GalaxyHierarchyCacheEntry current) : IGalaxyHierarchyCache
{
/// <summary>Gets the current cache entry.</summary>
public GalaxyHierarchyCacheEntry Current { get; } = current;
/// <inheritdoc />
public Task RefreshAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public Task WaitForFirstLoadAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class CapturingWorkerClient : IWorkerClient
{
/// <summary>Gets the most recent command forwarded to the worker.</summary>
public WorkerCommand? LastCommand { get; private set; }
/// <summary>Gets or sets the reply returned by the next invocation.</summary>
public WorkerCommandReply NextReply { get; set; } = new();
/// <summary>Gets the session identifier.</summary>
public string SessionId { get; } = "session-array-write-wiring";
/// <summary>Gets the worker process identifier.</summary>
public int? ProcessId { get; } = 1234;
/// <summary>Gets the worker client state.</summary>
public WorkerClientState State { get; } = WorkerClientState.Ready;
/// <summary>Gets the last recorded heartbeat timestamp.</summary>
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
LastCommand = command;
return Task.FromResult(NextReply);
}
/// <inheritdoc />
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask.ConfigureAwait(false);
yield break;
}
/// <inheritdoc />
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public void Kill(string reason)
{
}
/// <inheritdoc />
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}