From 9f7ae209954f405968b5f0aed9e886bcb2034583 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 23 May 2026 07:45:08 -0400 Subject: [PATCH] fix(driver-galaxy): resolve Low code-review findings (Driver.Galaxy-005,010,012,013) - Driver.Galaxy-005: rewrite the EventPump BoundedChannelOptions comment to honestly describe the Wait+TryWrite pattern. - Driver.Galaxy-010: ResolveApiKey now warns when a literal API key is used in production wiring; added an explicit dev: prefix for known cleartext-in-dev cases and rewrote the GalaxyGatewayOptions doc. - Driver.Galaxy-012: O(1) reverse-lookup for SubscriptionRegistry dispatch via per-entry FullRefByItemHandle map; immutable hash-set for the cross-binding reverse map; SubscribeAsync / ReadViaSubscribeOnce use BuildResultIndex for per-reference correlation. - Driver.Galaxy-013: ReinitializeAsync now validates the incoming JSON against the running options; ReplayOnSessionLost honoured by the Replay path; class summary rewritten to describe the shipped surface. Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Driver.Galaxy/findings.md | 18 +- .../Config/GalaxyDriverOptions.cs | 17 +- .../GalaxyDriver.cs | 168 +++++++++++++++--- .../Runtime/EventPump.cs | 11 +- .../Runtime/SubscriptionRegistry.cs | 69 +++++-- .../GalaxyDriverApiKeyResolverTests.cs | 56 ++++++ .../GalaxyDriverFactoryTests.cs | 18 +- .../GalaxyDriverInfrastructureTests.cs | 115 ++++++++++++ .../Runtime/SubscriptionRegistryTests.cs | 30 ++++ 9 files changed, 444 insertions(+), 58 deletions(-) diff --git a/code-reviews/Driver.Galaxy/findings.md b/code-reviews/Driver.Galaxy/findings.md index 167c848..e8ae0f2 100644 --- a/code-reviews/Driver.Galaxy/findings.md +++ b/code-reviews/Driver.Galaxy/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 4 | +| Open findings | 0 | ## Checklist coverage @@ -93,13 +93,13 @@ | Severity | Low | | Category | OtOpcUa conventions | | Location | `Runtime/EventPump.cs:81-88` | -| Status | Open | +| Status | Resolved | **Description:** The `BoundedChannelOptions` comment states "Newest-dropped policy: when full, the producer's TryWrite returns false ... We do this manually rather than relying on `BoundedChannelFullMode.DropWrite`" — but the option is then set to `FullMode = BoundedChannelFullMode.Wait`. With `Wait`, `TryWrite` returning `false` on a full channel is correct behaviour, so the code works, but the comment naming the mode and the actual mode disagree, which is confusing for a maintainer deciding whether the policy is `Wait`, `DropWrite`, or `DropNewest`. **Recommendation:** Either reword the comment to say "we use `Wait` mode but never call the awaitable `WriteAsync` — `TryWrite` gives us synchronous newest-dropped semantics", or switch to `BoundedChannelFullMode.DropWrite` and keep the manual drop count. Make the comment and the mode consistent. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — reworded the `BoundedChannelOptions` comment to say "we use FullMode.Wait but never call the awaitable WriteAsync — only synchronous TryWrite, which returns false immediately on a full channel and lets us account for drops on the EventsDropped counter". Also explains why we deliberately do NOT use `BoundedChannelFullMode.DropWrite` (it would silently discard without surfacing on the counter). Comment and `FullMode` value now agree. ### Driver.Galaxy-006 @@ -168,13 +168,13 @@ | Severity | Low | | Category | Security | | Location | `GalaxyDriver.cs:311-341` | -| Status | Open | +| Status | Resolved | **Description:** `ResolveApiKey` supports an `env:`/`file:` indirection and otherwise treats the config string as the literal API key ("Anything else — used as the literal API key. Convenient for dev"). `GalaxyGatewayOptions`' own XML doc claims "the API key never appears in cleartext config". The literal-key fallback silently permits a plaintext API key in the `DriverConfig` JSON column of the central config DB, contradicting the documented contract. There is no warning logged when the literal path is taken. **Recommendation:** Log a startup warning when `ResolveApiKey` falls through to the literal arm so an operator who accidentally committed a cleartext key sees it, and update the `GalaxyGatewayOptions` doc comment so it no longer over-promises. Consider gating the literal arm behind an explicit `dev:`-style prefix so a cleartext key cannot be used by accident. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — (a) added a logger-aware `ResolveApiKey(string, ILogger?)` overload that emits a `Warning` when the back-compat literal arm is taken, and wired the `BuildClientOptions` call site to pass `_logger`; (b) added an explicit `dev:KEY` prefix that returns the literal value without warning, so dev rigs / parity tests can opt-in deliberately; (c) rewrote the `GalaxyGatewayOptions.ApiKeySecretRef` XML doc so it no longer claims "the API key never appears in cleartext config" — it now documents all four supported forms (`env:`, `file:`, `dev:`, and the warning-on-literal back-compat path). Regression coverage in `GalaxyDriverApiKeyResolverTests` (`Literal_string_emits_warning_when_logger_supplied`, `Dev_prefix_returns_literal_without_warning`, `Env_prefix_does_not_emit_literal_warning`). ### Driver.Galaxy-011 @@ -198,13 +198,13 @@ | Severity | Low | | Category | Performance & resource management | | Location | `Runtime/SubscriptionRegistry.cs:65-67`, `GalaxyDriver.cs:538`, `GalaxyDriver.cs:675` | -| Status | Open | +| Status | Resolved | **Description:** Several hot paths are O(n^2) per call. `SubscriptionRegistry.ResolveSubscribers` does `entry.Bindings.FirstOrDefault(b => b.ItemHandle == itemHandle)` — a linear scan of the whole binding list for every event dispatch; at 50k tags this is 50k-element scans on the 1Hz fan-out path. `GalaxyDriver.SubscribeAsync` and `ReadViaSubscribeOnceAsync` correlate results to references with `results.FirstOrDefault(r => string.Equals(...))` inside a `for` loop over all references — O(n^2) over the subscribe batch. `SubscriptionRegistry.Remove` rebuilds a `ConcurrentBag` from a LINQ filter on every unsubscribe. **Recommendation:** Index `SubscriptionEntry` bindings by item handle (a `Dictionary` per entry) so `ResolveSubscribers` is O(1) per subscriber. Project the `SubscribeResult` list into a `Dictionary` (OrdinalIgnoreCase) once before the correlation loop. These matter on the documented 50k-tag soak path. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — three changes: (a) `SubscriptionEntry` now carries a `FullRefByItemHandle` `Dictionary` built once at construction; `ResolveSubscribers` does O(1) lookups per subscriber instead of a `FirstOrDefault` linear scan of the binding list. (b) Reverse map `_subscribersByItemHandle` swapped from `ConcurrentBag` to `ImmutableHashSet` — `Remove`/`Rebind` use `set.Remove(id)` (O(log n)) instead of "rebuild a new bag from a LINQ filter on every unsubscribe", and reads remain lock-free via atomic publication through `ConcurrentDictionary.AddOrUpdate`. (c) `GalaxyDriver.SubscribeAsync` + `ReadViaSubscribeOnceAsync` now index the `SubscribeResult` list once via the existing `BuildResultIndex` helper (already used by `ReplayAsync`) so per-reference correlation is O(1). Regression coverage in `SubscriptionRegistryTests.ResolveSubscribers_LargeBindingSet_DispatchesCorrectly`. ### Driver.Galaxy-013 @@ -213,13 +213,13 @@ | Severity | Low | | Category | Design-document adherence | | Location | `GalaxyDriver.cs:14-27`, `GalaxyDriver.cs:374-382`, `Config/GalaxyDriverOptions.cs:84-86` | -| Status | Open | +| Status | Resolved | **Description:** Multiple doc comments are stale relative to the shipped code. `GalaxyDriver`'s class summary still describes the file as "the project skeleton with `IDriver` bodies that wire to a future `IGalaxyGatewayClient` abstraction. Capability interfaces ... land in PRs 4.1-4.7" and references the legacy `GalaxyProxyDriver` coexisting "until PR 7.2" — but PR 7.2 already deleted the legacy Galaxy projects and the capability interfaces are all implemented. `ReinitializeAsync` is still a stub ("for the skeleton we just refresh health") that ignores `driverConfigJson` entirely — a config reapply silently does nothing. `GalaxyReconnectOptions.ReplayOnSessionLost` is defined and documented but never read anywhere in the driver (`ReplayAsync` always replays). **Recommendation:** Refresh the `GalaxyDriver` class and `ReinitializeAsync` doc comments to describe the shipped state, implement or explicitly reject `ReinitializeAsync` config reapply, and either honour `ReplayOnSessionLost` or remove it from `GalaxyReconnectOptions`. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — three fixes: (a) rewrote the `GalaxyDriver` class summary to describe the shipped capability surface (`ITagDiscovery`, `IReadable`, `IWritable`, `ISubscribable`, `IRediscoverable`, `IHostConnectivityProbe`, `IAlarmSource`) and removed the stale "PR 4.0 skeleton" / "legacy `GalaxyProxyDriver` coexists until PR 7.2" wording — PR 7.2 already retired the legacy projects. (b) `ReinitializeAsync` now parses the incoming `driverConfigJson` through the factory pipeline and compares the result to `_options`; an equivalent reapply refreshes health, a non-equivalent change throws `NotSupportedException` so a config swap never silently no-ops. (c) `ReplayAsync` now honours `_options.Reconnect.ReplayOnSessionLost` — when false it restarts the EventPump but skips the per-tag SubscribeBulk fan-out, delegating to gateway session-level replay. Regression coverage in `GalaxyDriverInfrastructureTests` (`ReinitializeAsync_RejectsNonEquivalentConfigChange`, `ReinitializeAsync_AcceptsEquivalentConfig`, `ReplayOnSessionLost_False_SkipsResubscribeBulk`, `ReplayOnSessionLost_True_RunsResubscribeBulk`). Updated `GalaxyDriverFactoryTests.ReinitializeAsync_RefreshesHealth_WhenConfigIsEquivalent` to use an equivalent config JSON. ### Driver.Galaxy-014 diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs index 03fb547..a5d6602 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs @@ -18,9 +18,20 @@ public sealed record GalaxyDriverOptions( GalaxyReconnectOptions Reconnect); /// -/// Connection details for the MxAccess gateway. resolves -/// through the server-side secret store (DPAPI for production, environment override for -/// dev) — the API key never appears in cleartext config. +/// Connection details for the MxAccess gateway. is +/// resolved by GalaxyDriver.ResolveApiKey at InitializeAsync time. Four forms +/// supported, in priority order: +/// +/// env:NAME — read from an environment variable (recommended for +/// production; the central config DB holds only the indirection, not the key). +/// file:PATH — read from an ACL'd file outside the repo. +/// dev:KEY — explicit cleartext opt-in for dev rigs / parity tests; +/// no startup warning. +/// Anything else — treated as a literal cleartext API key for back-compat. +/// The resolver emits a Warning at startup so an operator who accidentally +/// committed a cleartext key sees it (Driver.Galaxy-010); production should +/// migrate to env: or file:. +/// /// // PR 6.5 tuning notes: // ConnectTimeoutSeconds = 10 — cold-start network path comfort margin; soak runs diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index 70b6151..1a0b841 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -11,19 +11,29 @@ using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy; /// -/// In-process .NET 10 Galaxy driver — the v2 replacement for the Galaxy.Host / -/// Galaxy.Proxy pair. PR 4.0 ships the project skeleton with -/// bodies that wire to a future IGalaxyGatewayClient abstraction. Capability -/// interfaces (browse, read, write, subscribe, history routing, host probes) land in -/// PRs 4.1–4.7; the wiring sequence keeps every intermediate state buildable so the -/// Galaxy:Backend flag (PR 4.W) can flip between legacy-host and mxgateway -/// for parity testing. +/// In-process .NET 10 Galaxy driver — the only Galaxy backend since PR 7.2 retired +/// the legacy Galaxy.Host / Galaxy.Proxy / Galaxy.Shared +/// projects and the OtOpcUaGalaxyHost Windows service. Implements the full +/// capability surface: , , +/// , , , +/// , and . Galaxy +/// access flows through the in-process driver over gRPC to the separately +/// installed mxaccessgw gateway (sibling repo), which owns the MXAccess +/// COM apartment server-side. /// /// -/// This driver is registered as a Tier A in-process driver alongside Modbus / S7 / etc. -/// The legacy GalaxyProxyDriver (Driver.Galaxy.Proxy) coexists until PR 7.2; -/// registers under driver-type name -/// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing. +/// +/// Registered as a Tier A in-process driver alongside Modbus / S7 / etc. via +/// under driver-type name +/// "GalaxyMxGateway". +/// +/// +/// Tests inject capability seams (, +/// , , +/// , , +/// ) via the internal ctor so capability flow +/// can be exercised without a real gw round-trip. +/// /// public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable @@ -171,6 +181,16 @@ public sealed class GalaxyDriver /// Test-visible options snapshot. internal GalaxyDriverOptions Options => _options; + /// + /// Test-visible entry into . The supervisor's + /// drives this on a + /// background task in production; tests prefer to invoke it directly so the + /// branch can be + /// asserted deterministically (Driver.Galaxy-013). + /// + internal Task InvokeReplayForTestAsync(CancellationToken cancellationToken) => + ReplayAsync(cancellationToken); + /// public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { @@ -275,6 +295,20 @@ public sealed class GalaxyDriver var entries = _subscriptions.SnapshotEntries(); if (entries.Count == 0) return; + // Driver.Galaxy-013: honor ReplayOnSessionLost. When operators opt out (false) + // we skip the per-tag SubscribeBulk fan-out — they're delegating to the + // gateway's session-level ReplaySubscriptions or accept post-reconnect tag + // loss. We still restart the EventPump so a future Subscribe call lands on + // a live consumer. + if (!_options.Reconnect.ReplayOnSessionLost) + { + RestartEventPumpForReplay(); + _logger.LogInformation( + "GalaxyDriver {InstanceId} reconnect replay skipped — ReplayOnSessionLost=false ({SubCount} subscriptions tracked)", + _driverInstanceId, entries.Count); + return; + } + // The stream-fault that triggered this recovery left the old pump's RunAsync loop // exited and its channel completed; EventPump.Start() is a no-op on a non-null but // completed loop. Recreate the pump so the replayed subscriptions have a consumer. @@ -380,7 +414,7 @@ public sealed class GalaxyDriver } /// - /// Resolves Gateway.ApiKeySecretRef to the actual API-key bytes. Three + /// Resolves Gateway.ApiKeySecretRef to the actual API-key bytes. Four /// forms supported, evaluated in order: /// /// env:NAME — reads Environment.GetEnvironmentVariable(NAME). @@ -389,13 +423,26 @@ public sealed class GalaxyDriver /// file:PATH — reads UTF-8 text from PATH, trimming /// whitespace. Lets operators stash the key in an ACL'd file outside the /// repo (the same pattern as the legacy .local/galaxy-host-secret.txt). - /// Anything else — used as the literal API key. Convenient for dev, - /// and avoids breaking existing configs that pre-date this resolver. + /// dev:KEY — explicit cleartext literal. The dev: prefix + /// is a deliberate opt-in signal (dev box, parity rig) so the resolver + /// doesn't emit a warning; production should never use this arm. + /// Anything else — used as the literal API key for back-compat with + /// configs that pre-date this resolver. When a logger is supplied the + /// resolver emits a startup warning so an operator who accidentally + /// committed a cleartext key sees it (Driver.Galaxy-010). /// /// A future PR can swap any of these arms for a DPAPI-backed lookup without /// changing the call site. /// - internal static string ResolveApiKey(string secretRef) + internal static string ResolveApiKey(string secretRef) => ResolveApiKey(secretRef, logger: null); + + /// + /// Logger-aware overload. Emits a if the secret + /// ref falls through to the back-compat literal arm (an unprefixed cleartext + /// API key in DriverConfig JSON). The dev: prefix is the explicit + /// opt-in path that doesn't warn. + /// + internal static string ResolveApiKey(string secretRef, ILogger? logger) { ArgumentException.ThrowIfNullOrEmpty(secretRef); @@ -424,13 +471,30 @@ public sealed class GalaxyDriver $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' file '{path}' is empty."); } + if (secretRef.StartsWith("dev:", StringComparison.OrdinalIgnoreCase)) + { + // Explicit dev opt-in — no warning, the operator deliberately chose a + // cleartext literal (dev box, parity rig). + return secretRef[4..]; + } + + // Back-compat literal arm. An unprefixed string is treated as the literal + // API key — but emit a warning so an operator who accidentally committed a + // cleartext key into DriverConfig sees it at startup. Use the dev: prefix + // to suppress this warning when the literal is intentional. + logger?.LogWarning( + "Galaxy.Gateway.ApiKeySecretRef is being treated as a literal cleartext API key. " + + "Prefer env:NAME, file:PATH, or the explicit dev:KEY prefix for dev rigs — " + + "a literal key in DriverConfig JSON is stored in cleartext in the central config DB."); return secretRef; } - private static MxGatewayClientOptions BuildClientOptions(GalaxyGatewayOptions gw) => new() + private MxGatewayClientOptions BuildClientOptions(GalaxyGatewayOptions gw) => new() { Endpoint = new Uri(gw.Endpoint, UriKind.Absolute), - ApiKey = ResolveApiKey(gw.ApiKeySecretRef), + // Driver.Galaxy-010: pass the logger so the literal-arm cleartext fallback + // surfaces a startup warning rather than silently shipping the key. + ApiKey = ResolveApiKey(gw.ApiKeySecretRef, _logger), UseTls = gw.UseTls, CaCertificatePath = gw.CaCertificatePath, ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds), @@ -463,15 +527,64 @@ public sealed class GalaxyDriver } /// + /// + /// + /// In-place config reapply. The driver does not currently support + /// hot-swapping at runtime — changing the + /// gateway endpoint, MxAccess client name, or reconnect policy requires + /// tearing down the gw session, supervisor, event pump, and address space. + /// The host stack handles that via DriverInstance restart, so this method + /// only accepts an equivalent config (no meaningful change) and refreshes + /// health; a non-equivalent reapply throws + /// so the caller knows the change wasn't applied (Driver.Galaxy-013: + /// previously the method silently ignored driverConfigJson). + /// + /// public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { - // In-place config reapply. PR 4.5's reconnect supervisor will swap the - // gateway-client options under the lock; for the skeleton we just refresh health. ObjectDisposedException.ThrowIf(_disposed, this); + if (!string.IsNullOrWhiteSpace(driverConfigJson)) + { + // Materialise the incoming config and compare against the live options. We + // refuse any change that would require a session teardown rather than + // pretending to apply it. + GalaxyDriverOptions incoming; + try + { + // Reuse the factory's parse pipeline so any missing-required-field + // error surfaces with the same diagnostic text as InitializeAsync. + var transient = GalaxyDriverFactoryExtensions.CreateInstance(_driverInstanceId, driverConfigJson); + incoming = transient.Options; + // The transient instance never started a runtime — disposing is cheap. + transient.Dispose(); + } + catch (Exception ex) when (ex is not NotSupportedException and not ObjectDisposedException) + { + throw new NotSupportedException( + $"GalaxyDriver.ReinitializeAsync could not parse the incoming DriverConfig JSON for '{_driverInstanceId}': {ex.Message}", + ex); + } + + if (!OptionsAreEquivalent(_options, incoming)) + { + throw new NotSupportedException( + "GalaxyDriver.ReinitializeAsync does not support hot-swapping driver options at runtime " + + "(gateway endpoint, MxAccess client name, reconnect policy, etc.). Restart the DriverInstance " + + "through the host stack to apply a config change."); + } + } _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); return Task.CompletedTask; } + /// + /// Compare two for runtime equivalence — every + /// field that drives gw session shape, address space, or reconnect behaviour + /// must match. Records get value-equality from the language, so a direct + /// equality check is enough. + /// + private static bool OptionsAreEquivalent(GalaxyDriverOptions a, GalaxyDriverOptions b) => a == b; + /// public Task ShutdownAsync(CancellationToken cancellationToken) { @@ -637,12 +750,18 @@ public sealed class GalaxyDriver } // Register bindings so the pump knows to dispatch events for these handles. + // Driver.Galaxy-012: index the SubscribeBulk results once and correlate to + // references in O(1) instead of FirstOrDefault per element (O(n²) over the + // batch). On the 50k-tag soak path this turns a 2.5G-comparison loop into + // a single Dictionary build + linear scan. + var resultIndex = BuildResultIndex(results); var bindings = new List(fullReferences.Count); for (var i = 0; i < fullReferences.Count; i++) { var fullRef = fullReferences[i]; - var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase)); - var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0; + var itemHandle = resultIndex.TryGetValue(fullRef, out var match) && match is { WasSuccessful: true } + ? match.ItemHandle + : 0; bindings.Add(new TagBinding(fullRef, itemHandle)); // Tags the gw rejected up front — complete with Bad status now so the @@ -774,12 +893,15 @@ public sealed class GalaxyDriver // recorded with a non-positive ItemHandle so the caller can detect partial failure // by inspecting the returned handle's diagnostic context — full per-tag error // surface lands in PR 5.3's parity tests. + // Driver.Galaxy-012: index results once, correlate in O(1) per reference rather + // than FirstOrDefault inside the loop (O(n²) on the 50k-tag path). + var resultIndex = BuildResultIndex(results); var bindings = new List(fullReferences.Count); for (var i = 0; i < fullReferences.Count; i++) { var fullRef = fullReferences[i]; - var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase)); - var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0; + var hasMatch = resultIndex.TryGetValue(fullRef, out var match); + var itemHandle = hasMatch && match is { WasSuccessful: true } ? match.ItemHandle : 0; bindings.Add(new TagBinding(fullRef, itemHandle)); if (match is null || !match.WasSuccessful) { diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs index be505a7..618792e 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs @@ -85,9 +85,14 @@ internal sealed class EventPump : IAsyncDisposable } _channel = Channel.CreateBounded(new BoundedChannelOptions(channelCapacity) { - // Newest-dropped policy: when full, the producer's TryWrite returns false - // and we account for the drop. We do this manually rather than relying on - // BoundedChannelFullMode.DropWrite so we can count drops without polling. + // Newest-dropped semantics: we use FullMode.Wait but never call the + // awaitable WriteAsync — only the synchronous TryWrite below in + // RunAsync. With Wait + TryWrite, a full channel makes TryWrite return + // false immediately, which we account for via the EventsDropped counter. + // We deliberately do NOT use BoundedChannelFullMode.DropWrite — that + // would silently discard the new event inside Channel without + // surfacing the drop on a counter (Driver.Galaxy-005: keep the comment + // and the FullMode value consistent). FullMode = BoundedChannelFullMode.Wait, SingleReader = true, SingleWriter = true, diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs index 8873314..238b83b 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Collections.Immutable; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; @@ -18,7 +19,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; internal sealed class SubscriptionRegistry { private readonly ConcurrentDictionary _bySubscriptionId = new(); - private readonly ConcurrentDictionary> _subscribersByItemHandle = new(); + // Driver.Galaxy-012: use ImmutableHashSet for the reverse map so removal is + // O(log n) instead of "rebuild the entire ConcurrentBag from a LINQ filter on every + // unsubscribe"; reads are lock-free because the immutable snapshot is published + // atomically via ConcurrentDictionary AddOrUpdate. + private readonly ConcurrentDictionary> _subscribersByItemHandle = new(); private long _nextSubscriptionId; public int TrackedSubscriptionCount => _bySubscriptionId.Count; @@ -42,7 +47,7 @@ internal sealed class SubscriptionRegistry _subscribersByItemHandle.AddOrUpdate( binding.ItemHandle, _ => [subscriptionId], - (_, bag) => { bag.Add(subscriptionId); return bag; }); + (_, set) => set.Add(subscriptionId)); } } @@ -57,12 +62,10 @@ internal sealed class SubscriptionRegistry foreach (var binding in entry.Bindings) { if (binding.ItemHandle <= 0) continue; - if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var bag)) continue; - - // Filter the bag to drop this subscription id. ConcurrentBag has no Remove — - // rebuild it from the remaining entries. The contention here is bounded by - // the number of tags in the dropped subscription. - var remaining = new ConcurrentBag(bag.Where(id => id != subscriptionId)); + // Driver.Galaxy-012: ImmutableHashSet.Remove is O(log n) and the result is + // published atomically — no need to rebuild from a LINQ filter. + if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue; + var remaining = set.Remove(subscriptionId); if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); else _subscribersByItemHandle[binding.ItemHandle] = remaining; } @@ -74,18 +77,23 @@ internal sealed class SubscriptionRegistry /// Look up the (subscription id, full reference) pairs that should receive an /// OnDataChange for the given gw item handle. Returns empty when nobody subscribes. /// + /// + /// Driver.Galaxy-012: O(1) per subscriber via the per-entry + /// FullRefByItemHandle index, rather than a FirstOrDefault linear + /// scan of the binding list. At 50k tags / 1Hz this turns each dispatch from a + /// 50k-element scan into a single dictionary lookup. + /// public IReadOnlyList<(long SubscriptionId, string FullReference)> ResolveSubscribers(int itemHandle) { if (!_subscribersByItemHandle.TryGetValue(itemHandle, out var bag)) return []; // Each subscription may include the tag once. Walk every active subscription that - // claims this handle and pull the full ref from its binding list. + // claims this handle and pull the full ref from its index in O(1). var result = new List<(long, string)>(); foreach (var subId in bag.Distinct()) { if (!_bySubscriptionId.TryGetValue(subId, out var entry)) continue; - var binding = entry.Bindings.FirstOrDefault(b => b.ItemHandle == itemHandle); - if (binding is { FullReference: { } fullRef }) + if (entry.FullRefByItemHandle.TryGetValue(itemHandle, out var fullRef)) result.Add((subId, fullRef)); } return result; @@ -113,14 +121,14 @@ internal sealed class SubscriptionRegistry { if (!_bySubscriptionId.TryGetValue(subscriptionId, out var oldEntry)) return; - // Drop this subscription from every reverse-map bag it currently appears in. The + // Drop this subscription from every reverse-map set it currently appears in. The // pre-reconnect item handles are stale once the gw re-issues fresh ones. + // Driver.Galaxy-012: ImmutableHashSet.Remove is O(log n) — no LINQ rebuild. foreach (var binding in oldEntry.Bindings) { if (binding.ItemHandle <= 0) continue; - if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var bag)) continue; - - var remaining = new ConcurrentBag(bag.Where(id => id != subscriptionId)); + if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue; + var remaining = set.Remove(subscriptionId); if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); else _subscribersByItemHandle[binding.ItemHandle] = remaining; } @@ -132,11 +140,38 @@ internal sealed class SubscriptionRegistry _subscribersByItemHandle.AddOrUpdate( binding.ItemHandle, _ => [subscriptionId], - (_, bag) => { bag.Add(subscriptionId); return bag; }); + (_, set) => set.Add(subscriptionId)); } } - private sealed record SubscriptionEntry(long SubscriptionId, IReadOnlyList Bindings); + /// + /// Per-subscription bookkeeping. is an index + /// over keyed by item handle so ResolveSubscribers + /// is O(1) per subscriber instead of a linear scan of every binding + /// (Driver.Galaxy-012). Failed bindings (item handle ≤ 0) are excluded from the + /// index because the EventPump only dispatches for positive handles. + /// + private sealed class SubscriptionEntry + { + public long SubscriptionId { get; } + public IReadOnlyList Bindings { get; } + public IReadOnlyDictionary FullRefByItemHandle { get; } + + public SubscriptionEntry(long subscriptionId, IReadOnlyList bindings) + { + SubscriptionId = subscriptionId; + Bindings = bindings; + var index = new Dictionary(bindings.Count); + foreach (var binding in bindings) + { + if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected + // Last-write-wins on duplicates; the driver doesn't double-register a handle + // within a single subscription, but be defensive. + index[binding.ItemHandle] = binding.FullReference; + } + FullRefByItemHandle = index; + } + } } /// diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs index 21e491a..5976cf9 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.Logging; using Shouldly; using Xunit; @@ -69,6 +70,61 @@ public sealed class GalaxyDriverApiKeyResolverTests ex.Message.ShouldContain("doesn't exist"); } + // ===== Driver.Galaxy-010 regression: literal arm warns + dev: prefix path ===== + + [Fact] + public void Literal_string_emits_warning_when_logger_supplied() + { + // A literal API key on a production deployment means the cleartext key sits + // in the DriverConfig JSON. The resolver must surface a warning so an + // operator who committed one by accident sees it at startup. + var logger = new CaptureLogger(); + var key = GalaxyDriver.ResolveApiKey("plain-text-key", logger); + + key.ShouldBe("plain-text-key"); + logger.Entries.ShouldContain(e => + e.Level == LogLevel.Warning && e.Message.Contains("literal", StringComparison.OrdinalIgnoreCase)); + } + + [Fact] + public void Dev_prefix_returns_literal_without_warning() + { + // An explicit dev: prefix signals the operator knowingly opted into a literal + // key (dev / parity rig). The resolver must accept it AND suppress the + // warning so production logs aren't polluted on a deliberate dev choice. + var logger = new CaptureLogger(); + var key = GalaxyDriver.ResolveApiKey("dev:plain-text-key", logger); + + key.ShouldBe("plain-text-key"); + logger.Entries.ShouldNotContain(e => e.Level == LogLevel.Warning); + } + + [Fact] + public void Env_prefix_does_not_emit_literal_warning() + { + const string name = "OTOPCUA_TEST_GALAXY_API_KEY_NOWARN"; + Environment.SetEnvironmentVariable(name, "v"); + try + { + var logger = new CaptureLogger(); + GalaxyDriver.ResolveApiKey($"env:{name}", logger); + logger.Entries.ShouldNotContain(e => e.Level == LogLevel.Warning); + } + finally + { + Environment.SetEnvironmentVariable(name, null); + } + } + + private sealed class CaptureLogger : ILogger + { + public List<(LogLevel Level, string Message)> Entries { get; } = new(); + public IDisposable? BeginScope(TState state) where TState : notnull => null; + public bool IsEnabled(LogLevel logLevel) => true; + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + => Entries.Add((logLevel, formatter(state, exception))); + } + [Fact] public void File_prefix_empty_file_throws() { diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverFactoryTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverFactoryTests.cs index b566bdc..41aa305 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverFactoryTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverFactoryTests.cs @@ -141,17 +141,29 @@ public sealed class GalaxyDriverFactoryTests } [Fact] - public async Task ReinitializeAsync_RefreshesHealth() + public async Task ReinitializeAsync_RefreshesHealth_WhenConfigIsEquivalent() { + // Driver.Galaxy-013: ReinitializeAsync now compares the incoming JSON to the + // live options. An equivalent config is accepted and refreshes health; a + // non-equivalent reapply throws NotSupportedException (covered in + // GalaxyDriverInfrastructureTests.ReinitializeAsync_RejectsNonEquivalentConfigChange). + // Build a config JSON whose parsed shape equals BuildOptions() so the + // equivalence check passes. + const string equivalentConfig = """ + { + "Gateway": { "Endpoint": "https://mxgw.test:5001", "ApiKeySecretRef": "key" }, + "MxAccess": { "ClientName": "OtOpcUa-A" } + } + """; using var driver = new GalaxyDriver( "galaxy-x", BuildOptions(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: new NoopSubscriber()); - await driver.InitializeAsync(MinimalConfig, CancellationToken.None); + await driver.InitializeAsync(equivalentConfig, CancellationToken.None); var firstStamp = driver.GetHealth().LastSuccessfulRead!.Value; // Force a measurable clock delta so the comparison is stable on fast machines. await Task.Delay(20); - await driver.ReinitializeAsync(MinimalConfig, CancellationToken.None); + await driver.ReinitializeAsync(equivalentConfig, CancellationToken.None); driver.GetHealth().State.ShouldBe(DriverState.Healthy); driver.GetHealth().LastSuccessfulRead!.Value.ShouldBeGreaterThan(firstStamp); diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverInfrastructureTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverInfrastructureTests.cs index bdf9304..c334caa 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverInfrastructureTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverInfrastructureTests.cs @@ -85,6 +85,121 @@ public sealed class GalaxyDriverInfrastructureTests await Should.NotThrowAsync(async () => await driver.DisposeAsync()); } + // ===== Driver.Galaxy-013 regression: ReplayOnSessionLost gates the replay step ===== + + [Fact] + public async Task ReplayOnSessionLost_False_SkipsResubscribeBulk() + { + // ReplayOnSessionLost was a dangling option — defined + documented but never + // read. After the fix, setting it to false makes the reconnect replay path + // skip SubscribeBulk (operator opts out of replay; the gateway's session-level + // ReplaySubscriptions handles state restoration). + var sub = new ReplayCountingSubscriber(); + var opts = new GalaxyDriverOptions( + new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), + new GalaxyMxAccessOptions("InfraTest"), + new GalaxyRepositoryOptions(WatchDeployEvents: false), + new GalaxyReconnectOptions(ReplayOnSessionLost: false)); + + using var driver = new GalaxyDriver("drv-1", opts, null, null, null, sub); + + // Establish a subscription so the replay path has something to walk. + await driver.SubscribeAsync(["Tag.A", "Tag.B"], TimeSpan.Zero, CancellationToken.None); + sub.SubscribeCalls.ShouldBe(1); + + // Invoke the replay path directly via the internal test seam — the supervisor's + // ReportTransportFailure spins it up async; for a deterministic assertion we + // call the helper that ReplayAsync is wired against. + await driver.InvokeReplayForTestAsync(CancellationToken.None); + + sub.SubscribeCalls.ShouldBe(1, + "ReplayOnSessionLost=false must skip the re-SubscribeBulk fan-out on reconnect"); + } + + [Fact] + public async Task ReplayOnSessionLost_True_RunsResubscribeBulk() + { + var sub = new ReplayCountingSubscriber(); + var opts = new GalaxyDriverOptions( + new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), + new GalaxyMxAccessOptions("InfraTest"), + new GalaxyRepositoryOptions(WatchDeployEvents: false), + new GalaxyReconnectOptions(ReplayOnSessionLost: true)); + + using var driver = new GalaxyDriver("drv-1", opts, null, null, null, sub); + + await driver.SubscribeAsync(["Tag.A"], TimeSpan.Zero, CancellationToken.None); + sub.SubscribeCalls.ShouldBe(1); + + await driver.InvokeReplayForTestAsync(CancellationToken.None); + + sub.SubscribeCalls.ShouldBe(2, + "default ReplayOnSessionLost=true must re-issue SubscribeBulk after a transport drop"); + } + + private sealed class ReplayCountingSubscriber : IGalaxySubscriber + { + private readonly Channel _stream = Channel.CreateUnbounded(); + private int _nextHandle = 1; + public int SubscribeCalls; + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + { + Interlocked.Increment(ref SubscribeCalls); + var results = fullReferences.Select(r => new SubscribeResult + { + TagAddress = r, + ItemHandle = Interlocked.Increment(ref _nextHandle), + WasSuccessful = true, + }).ToList(); + return Task.FromResult>(results); + } + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => _stream.Reader.ReadAllAsync(cancellationToken); + } + + // ===== Driver.Galaxy-013 regression: ReinitializeAsync rejects unsupported reapply ===== + + [Fact] + public async Task ReinitializeAsync_RejectsNonEquivalentConfigChange() + { + // ReinitializeAsync was previously a silent no-op that ignored driverConfigJson. + // After the fix it either applies an equivalent config (no-op) or throws + // NotSupportedException so a config change isn't silently dropped. + var sub = new NoOpSubscriber(); + using var driver = new GalaxyDriver("drv-1", Opts(), null, null, null, sub); + + const string newConfig = "{\"Gateway\":{\"Endpoint\":\"https://other.test:5001\",\"ApiKeySecretRef\":\"dev:other\"}}"; + + // The driver must NOT pretend the change was applied — either no-op equivalence + // or an explicit rejection is acceptable. Silently dropping the new config + // (the previous behaviour) is not. + await Should.ThrowAsync(async () => + await driver.ReinitializeAsync(newConfig, CancellationToken.None)); + } + + [Fact] + public async Task ReinitializeAsync_AcceptsEquivalentConfig() + { + var sub = new NoOpSubscriber(); + using var driver = new GalaxyDriver("drv-1", Opts(), null, null, null, sub); + + // An empty / null-equivalent config reapply (no field changes) must not throw — + // it's a legitimate "refresh health" path. Pass a JSON object that round-trips + // to the driver's current options. + var json = "{\"Gateway\":{\"Endpoint\":\"https://mxgw.test:5001\",\"ApiKeySecretRef\":\"key\"}," + + "\"MxAccess\":{\"ClientName\":\"InfraTest\"}," + + "\"Repository\":{\"WatchDeployEvents\":false}," + + "\"Reconnect\":{}}"; + await Should.NotThrowAsync(async () => + await driver.ReinitializeAsync(json, CancellationToken.None)); + } + // ===== Minimal IGalaxySubscriber fake that returns empty results for subscribe calls ===== private sealed class NoOpSubscriber : IGalaxySubscriber diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs index 7eaec95..9736b46 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs @@ -183,6 +183,36 @@ public sealed class SubscriptionRegistryTests registry.ResolveSubscribers(0).ShouldBeEmpty(); } + // ===== Driver.Galaxy-012 regression: ResolveSubscribers is O(1) per binding ===== + + [Fact] + public void ResolveSubscribers_LargeBindingSet_DispatchesCorrectly() + { + // 5000-tag subscription. ResolveSubscribers must still return the right + // full-reference for any item handle without a linear scan of the entire + // binding list — the old FirstOrDefault(b => b.ItemHandle == h) was O(n) + // per dispatch, so 50k tags × 1Hz fan-out was 50k linear scans per second. + var registry = new SubscriptionRegistryAccess(); + const int tagCount = 5000; + var bindings = new List(tagCount); + for (var i = 0; i < tagCount; i++) + { + bindings.Add(new TagBindingAccess($"Tag.{i}", 1000 + i)); + } + registry.Register(1, bindings); + + // Pull the last entry — the worst case for a linear scan. + var subs = registry.ResolveSubscribers(1000 + tagCount - 1); + + subs.Count.ShouldBe(1); + subs[0].FullReference.ShouldBe($"Tag.{tagCount - 1}"); + + // Mid-range entry too — proves the index isn't position-dependent. + var mid = registry.ResolveSubscribers(1000 + tagCount / 2); + mid.Count.ShouldBe(1); + mid[0].FullReference.ShouldBe($"Tag.{tagCount / 2}"); + } + // Internal types are accessed via friend assembly (InternalsVisibleTo); these // wrapper aliases keep the test code readable. private sealed class SubscriptionRegistryAccess