v2-mxgw follow-ups: production reads, secret resolution, perf knobs

Lands the five concrete code-level follow-ups identified after Phase 7.1:

#1 GalaxyDriver.ReadAsync now works in production. Previously threw
   NotSupportedException when no test reader was injected. New path
   subscribes through the existing SubscriptionRegistry + EventPump,
   waits for the first OnDataChange per item handle (gw pushes the
   initial value after SubscribeBulk), then unsubscribes. Tags the gw
   rejects up front, or that don't publish before the caller's CT
   fires, return Bad-status snapshots in input order so callers still
   get one snapshot per requested reference.

#2 ResolveApiKey() routes Gateway.ApiKeySecretRef through three forms:
   env:NAME, file:PATH, or literal-string fallback. A future DPAPI arm
   slots in here without touching the call site.

#3 GatewayGalaxySubscriber actually honors bufferedUpdateIntervalMs now
   (was being silently dropped). Calls SetBufferedUpdateInterval via
   the gw's MxCommandKind.SetBufferedUpdateInterval before SubscribeBulk
   when the requested interval differs from the cached last-applied
   value. Soft-fails on a non-Ok protocol status (the SubscribeBulk
   still succeeds at gw cadence).

#4 GalaxyMxAccessOptions.EventPumpChannelCapacity surfaces the bounded-
   channel size through DriverConfig JSON, defaulting to 50_000.

#5 Stale doc-comments in HostStatusAggregator and GatewayGalaxySubscriber
   describing follow-ups that already shipped.

Tests: +6 (read subscribe-once happy path + rejected-tag fallback;
five resolver scenarios). Total Galaxy driver tests now 180/180 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-29 17:27:24 -04:00
parent d5a87c7467
commit 42f41fbe50
8 changed files with 406 additions and 28 deletions

View File

@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
@@ -262,10 +263,58 @@ public sealed class GalaxyDriver
}
}
/// <summary>
/// Resolves <c>Gateway.ApiKeySecretRef</c> to the actual API-key bytes. Three
/// forms supported, evaluated in order:
/// <list type="number">
/// <item><c>env:NAME</c> — reads <c>Environment.GetEnvironmentVariable(NAME)</c>.
/// Throws when the variable is unset, so a misconfigured deployment fails
/// fast at InitializeAsync rather than silently sending an empty key.</item>
/// <item><c>file:PATH</c> — reads UTF-8 text from <c>PATH</c>, trimming
/// whitespace. Lets operators stash the key in an ACL'd file outside the
/// repo (the same pattern as the legacy <c>.local/galaxy-host-secret.txt</c>).</item>
/// <item>Anything else — used as the literal API key. Convenient for dev,
/// and avoids breaking existing configs that pre-date this resolver.</item>
/// </list>
/// A future PR can swap any of these arms for a DPAPI-backed lookup without
/// changing the call site.
/// </summary>
internal static string ResolveApiKey(string secretRef)
{
ArgumentException.ThrowIfNullOrEmpty(secretRef);
if (secretRef.StartsWith("env:", StringComparison.OrdinalIgnoreCase))
{
var name = secretRef[4..];
var value = Environment.GetEnvironmentVariable(name);
return !string.IsNullOrEmpty(value)
? value
: throw new InvalidOperationException(
$"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' resolves to env var '{name}', but it is unset.");
}
if (secretRef.StartsWith("file:", StringComparison.OrdinalIgnoreCase))
{
var path = secretRef[5..];
if (!File.Exists(path))
{
throw new InvalidOperationException(
$"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' points at '{path}', which doesn't exist.");
}
var contents = File.ReadAllText(path).Trim();
return !string.IsNullOrEmpty(contents)
? contents
: throw new InvalidOperationException(
$"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' file '{path}' is empty.");
}
return secretRef;
}
private static MxGatewayClientOptions BuildClientOptions(GalaxyGatewayOptions gw) => new()
{
Endpoint = new Uri(gw.Endpoint, UriKind.Absolute),
ApiKey = gw.ApiKeySecretRef,
ApiKey = ResolveApiKey(gw.ApiKeySecretRef),
UseTls = gw.UseTls,
CaCertificatePath = gw.CaCertificatePath,
ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds),
@@ -367,7 +416,7 @@ public sealed class GalaxyDriver
private SecurityClassification ResolveSecurity(string fullReference) =>
_securityByFullRef.TryGetValue(fullReference, out var sec) ? sec : SecurityClassification.FreeAccess;
// ===== IReadable (PR 4.2 — abstraction; PR 4.4 supplies production reader) =====
// ===== IReadable =====
/// <inheritdoc />
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
@@ -377,19 +426,152 @@ public sealed class GalaxyDriver
ArgumentNullException.ThrowIfNull(fullReferences);
if (fullReferences.Count == 0) return Task.FromResult<IReadOnlyList<DataValueSnapshot>>([]);
if (_dataReader is null)
if (_dataReader is not null)
{
// The production GW-backed reader builds on the StreamEvents pump that PR 4.4
// ships; until then a real gateway-driver instance can't fulfill reads.
// Tests that need to exercise IReadable inject a fake reader via the internal
// ctor; production deployments running on this PR should keep the
// legacy-host backend selected via the Galaxy:Backend flag (PR 4.W).
throw new NotSupportedException(
"GalaxyDriver.ReadAsync requires the StreamEvents-backed reader from PR 4.4. " +
"Until that lands, route reads through the legacy-host backend (Galaxy:Backend=legacy-host).");
// Test-only path — tests inject a canned reader via the internal ctor.
return _dataReader.ReadAsync(fullReferences, cancellationToken);
}
return _dataReader.ReadAsync(fullReferences, cancellationToken);
if (_subscriber is null)
{
throw new NotSupportedException(
"GalaxyDriver.ReadAsync requires a connected GalaxyMxSession (production runtime not built). " +
"Either inject a test seam via the internal ctor or call InitializeAsync against a real gateway.");
}
return ReadViaSubscribeOnceAsync(fullReferences, cancellationToken);
}
/// <summary>
/// Production read path. MxAccess has no one-shot Read RPC — every value comes
/// through the event stream. We synthesise a Read by:
/// <list type="number">
/// <item>Subscribing the requested tags through the existing
/// <see cref="SubscriptionRegistry"/> + <see cref="EventPump"/>.</item>
/// <item>Waiting for the first <c>OnDataChange</c> per item handle (the gateway
/// pushes the current value as the initial event after a SubscribeBulk).</item>
/// <item>Unsubscribing.</item>
/// </list>
/// Tags the gw rejects at SubscribeBulk time, or that never publish before the
/// caller's cancellation token fires, return a Bad-status snapshot in input order
/// so the caller still sees one snapshot per requested reference.
/// </summary>
private async Task<IReadOnlyList<DataValueSnapshot>> ReadViaSubscribeOnceAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var pump = EnsureEventPumpStarted();
var subscriptionId = _subscriptions.NextSubscriptionId();
// Pre-allocate one TaskCompletionSource per full-reference so the OnDataChange
// handler can complete them out-of-order as events arrive. Wired BEFORE the
// SubscribeBulk call so we don't race with the first event the gw pushes.
var pendingByRef = new Dictionary<string, TaskCompletionSource<DataValueSnapshot>>(
StringComparer.OrdinalIgnoreCase);
foreach (var fullRef in fullReferences.Distinct(StringComparer.OrdinalIgnoreCase))
{
pendingByRef[fullRef] = new TaskCompletionSource<DataValueSnapshot>(
TaskCreationOptions.RunContinuationsAsynchronously);
}
EventHandler<DataChangeEventArgs> handler = (_, args) =>
{
// Filter to OUR subscription — the pump's OnDataChange fans out across all
// subscriptions on the driver, and we don't want a parallel ISubscribable
// caller's events to leak into our read.
if (args.SubscriptionHandle is GalaxySubscriptionHandle gsh
&& gsh.SubscriptionId == subscriptionId
&& pendingByRef.TryGetValue(args.FullReference, out var tcs))
{
tcs.TrySetResult(args.Snapshot);
}
};
pump.OnDataChange += handler;
var bufferedIntervalMs = _options.MxAccess.PublishingIntervalMs;
IReadOnlyList<SubscribeResult> results;
try
{
results = await _subscriber!
.SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken)
.ConfigureAwait(false);
}
catch
{
pump.OnDataChange -= handler;
throw;
}
// Register bindings so the pump knows to dispatch events for these handles.
var bindings = new List<TagBinding>(fullReferences.Count);
for (var i = 0; i < fullReferences.Count; i++)
{
var fullRef = fullReferences[i];
var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase));
var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0;
bindings.Add(new TagBinding(fullRef, itemHandle));
// Tags the gw rejected up front — complete with Bad status now so the
// wait below doesn't time out on them.
if (itemHandle <= 0
&& pendingByRef.TryGetValue(fullRef, out var rejectedTcs))
{
rejectedTcs.TrySetResult(new DataValueSnapshot(
Value: null,
StatusCode: 0x80000000u, // Bad
SourceTimestampUtc: null,
ServerTimestampUtc: DateTime.UtcNow));
}
}
_subscriptions.Register(subscriptionId, bindings);
try
{
// Wait for every pending TCS to complete or the caller's CT to fire. When the
// CT fires before all values arrive, fill the still-pending entries with a
// Bad-status snapshot rather than throwing — Read semantics let callers see
// partial results.
using var registration = cancellationToken.Register(() =>
{
foreach (var tcs in pendingByRef.Values)
{
tcs.TrySetResult(new DataValueSnapshot(
Value: null,
StatusCode: 0x800B0000u, // BadTimeout
SourceTimestampUtc: null,
ServerTimestampUtc: DateTime.UtcNow));
}
});
var snapshots = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
snapshots[i] = await pendingByRef[fullReferences[i]].Task.ConfigureAwait(false);
}
return snapshots;
}
finally
{
pump.OnDataChange -= handler;
// Drop the bindings + unsubscribe the live handles. UnsubscribeBulkAsync's
// failure isn't fatal — the registry is already cleared, so any straggling
// event from the gw would be a no-op fan-out.
_subscriptions.Remove(subscriptionId);
var liveHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray();
if (liveHandles.Length > 0)
{
try
{
await _subscriber!.UnsubscribeBulkAsync(liveHandles, CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"GalaxyDriver.ReadViaSubscribeOnceAsync UnsubscribeBulk failed for {Count} handle(s) — registry already cleared.",
liveHandles.Length);
}
}
}
}
// ===== IWritable (PR 4.3) =====
@@ -520,6 +702,7 @@ public sealed class GalaxyDriver
if (_eventPump is not null) return _eventPump;
_eventPump = new EventPump(
_subscriber!, _subscriptions, _logger,
channelCapacity: _options.MxAccess.EventPumpChannelCapacity,
clientName: _options.MxAccess.ClientName);
_eventPump.OnDataChange += OnPumpDataChange;
_eventPump.Start();
@@ -564,9 +747,7 @@ public sealed class GalaxyDriver
var clientOptions = new MxGatewayClientOptions
{
Endpoint = new Uri(gw.Endpoint, UriKind.Absolute),
// PR 4.1 stub: ApiKeySecretRef is currently treated as the literal API key.
// PR 4.W (or a follow-up) wires up DPAPI-backed secret resolution.
ApiKey = gw.ApiKeySecretRef,
ApiKey = ResolveApiKey(gw.ApiKeySecretRef),
UseTls = gw.UseTls,
CaCertificatePath = gw.CaCertificatePath,
ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds),