diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs index 3df8a79..03fb547 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Config/GalaxyDriverOptions.cs @@ -56,10 +56,17 @@ public sealed record GalaxyGatewayOptions( /// Reserved for ArchestrA secured-write user mapping; PR 4.3 wires WriteSecured /// routing against this id. 0 = anonymous. /// +/// +/// Bounded-channel size between the EventPump's network-read loop and its listener +/// fan-out loop (PR 6.2). Default 50_000 = one second of headroom at 50k tags / 1Hz; +/// raise it when galaxy.events.dropped shows up under transient consumer +/// slowness, lower it on a memory-tight host where the headroom isn't needed. +/// public sealed record GalaxyMxAccessOptions( string ClientName, int PublishingIntervalMs = 1000, - int WriteUserId = 0); + int WriteUserId = 0, + int EventPumpChannelCapacity = 50_000); /// /// Galaxy Repository browse-side knobs consumed by PR 4.1's GalaxyDiscoverer. diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index 987b2e8..515c063 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MxGateway.Client; +using MxGateway.Contracts.Proto; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config; @@ -262,10 +263,58 @@ public sealed class GalaxyDriver } } + /// + /// Resolves Gateway.ApiKeySecretRef to the actual API-key bytes. Three + /// forms supported, evaluated in order: + /// + /// env:NAME — reads Environment.GetEnvironmentVariable(NAME). + /// Throws when the variable is unset, so a misconfigured deployment fails + /// fast at InitializeAsync rather than silently sending an empty key. + /// file:PATH — reads UTF-8 text from PATH, trimming + /// whitespace. Lets operators stash the key in an ACL'd file outside the + /// repo (the same pattern as the legacy .local/galaxy-host-secret.txt). + /// Anything else — used as the literal API key. Convenient for dev, + /// and avoids breaking existing configs that pre-date this resolver. + /// + /// A future PR can swap any of these arms for a DPAPI-backed lookup without + /// changing the call site. + /// + internal static string ResolveApiKey(string secretRef) + { + ArgumentException.ThrowIfNullOrEmpty(secretRef); + + if (secretRef.StartsWith("env:", StringComparison.OrdinalIgnoreCase)) + { + var name = secretRef[4..]; + var value = Environment.GetEnvironmentVariable(name); + return !string.IsNullOrEmpty(value) + ? value + : throw new InvalidOperationException( + $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' resolves to env var '{name}', but it is unset."); + } + + if (secretRef.StartsWith("file:", StringComparison.OrdinalIgnoreCase)) + { + var path = secretRef[5..]; + if (!File.Exists(path)) + { + throw new InvalidOperationException( + $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' points at '{path}', which doesn't exist."); + } + var contents = File.ReadAllText(path).Trim(); + return !string.IsNullOrEmpty(contents) + ? contents + : throw new InvalidOperationException( + $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' file '{path}' is empty."); + } + + return secretRef; + } + private static MxGatewayClientOptions BuildClientOptions(GalaxyGatewayOptions gw) => new() { Endpoint = new Uri(gw.Endpoint, UriKind.Absolute), - ApiKey = gw.ApiKeySecretRef, + ApiKey = ResolveApiKey(gw.ApiKeySecretRef), UseTls = gw.UseTls, CaCertificatePath = gw.CaCertificatePath, ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds), @@ -367,7 +416,7 @@ public sealed class GalaxyDriver private SecurityClassification ResolveSecurity(string fullReference) => _securityByFullRef.TryGetValue(fullReference, out var sec) ? sec : SecurityClassification.FreeAccess; - // ===== IReadable (PR 4.2 — abstraction; PR 4.4 supplies production reader) ===== + // ===== IReadable ===== /// public Task> ReadAsync( @@ -377,19 +426,152 @@ public sealed class GalaxyDriver ArgumentNullException.ThrowIfNull(fullReferences); if (fullReferences.Count == 0) return Task.FromResult>([]); - if (_dataReader is null) + if (_dataReader is not null) { - // The production GW-backed reader builds on the StreamEvents pump that PR 4.4 - // ships; until then a real gateway-driver instance can't fulfill reads. - // Tests that need to exercise IReadable inject a fake reader via the internal - // ctor; production deployments running on this PR should keep the - // legacy-host backend selected via the Galaxy:Backend flag (PR 4.W). - throw new NotSupportedException( - "GalaxyDriver.ReadAsync requires the StreamEvents-backed reader from PR 4.4. " + - "Until that lands, route reads through the legacy-host backend (Galaxy:Backend=legacy-host)."); + // Test-only path — tests inject a canned reader via the internal ctor. + return _dataReader.ReadAsync(fullReferences, cancellationToken); } - return _dataReader.ReadAsync(fullReferences, cancellationToken); + if (_subscriber is null) + { + throw new NotSupportedException( + "GalaxyDriver.ReadAsync requires a connected GalaxyMxSession (production runtime not built). " + + "Either inject a test seam via the internal ctor or call InitializeAsync against a real gateway."); + } + + return ReadViaSubscribeOnceAsync(fullReferences, cancellationToken); + } + + /// + /// Production read path. MxAccess has no one-shot Read RPC — every value comes + /// through the event stream. We synthesise a Read by: + /// + /// Subscribing the requested tags through the existing + /// + . + /// Waiting for the first OnDataChange per item handle (the gateway + /// pushes the current value as the initial event after a SubscribeBulk). + /// Unsubscribing. + /// + /// Tags the gw rejects at SubscribeBulk time, or that never publish before the + /// caller's cancellation token fires, return a Bad-status snapshot in input order + /// so the caller still sees one snapshot per requested reference. + /// + private async Task> ReadViaSubscribeOnceAsync( + IReadOnlyList fullReferences, CancellationToken cancellationToken) + { + var pump = EnsureEventPumpStarted(); + var subscriptionId = _subscriptions.NextSubscriptionId(); + + // Pre-allocate one TaskCompletionSource per full-reference so the OnDataChange + // handler can complete them out-of-order as events arrive. Wired BEFORE the + // SubscribeBulk call so we don't race with the first event the gw pushes. + var pendingByRef = new Dictionary>( + StringComparer.OrdinalIgnoreCase); + foreach (var fullRef in fullReferences.Distinct(StringComparer.OrdinalIgnoreCase)) + { + pendingByRef[fullRef] = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + } + + EventHandler handler = (_, args) => + { + // Filter to OUR subscription — the pump's OnDataChange fans out across all + // subscriptions on the driver, and we don't want a parallel ISubscribable + // caller's events to leak into our read. + if (args.SubscriptionHandle is GalaxySubscriptionHandle gsh + && gsh.SubscriptionId == subscriptionId + && pendingByRef.TryGetValue(args.FullReference, out var tcs)) + { + tcs.TrySetResult(args.Snapshot); + } + }; + pump.OnDataChange += handler; + + var bufferedIntervalMs = _options.MxAccess.PublishingIntervalMs; + IReadOnlyList results; + try + { + results = await _subscriber! + .SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken) + .ConfigureAwait(false); + } + catch + { + pump.OnDataChange -= handler; + throw; + } + + // Register bindings so the pump knows to dispatch events for these handles. + var bindings = new List(fullReferences.Count); + for (var i = 0; i < fullReferences.Count; i++) + { + var fullRef = fullReferences[i]; + var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase)); + var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0; + bindings.Add(new TagBinding(fullRef, itemHandle)); + + // Tags the gw rejected up front — complete with Bad status now so the + // wait below doesn't time out on them. + if (itemHandle <= 0 + && pendingByRef.TryGetValue(fullRef, out var rejectedTcs)) + { + rejectedTcs.TrySetResult(new DataValueSnapshot( + Value: null, + StatusCode: 0x80000000u, // Bad + SourceTimestampUtc: null, + ServerTimestampUtc: DateTime.UtcNow)); + } + } + _subscriptions.Register(subscriptionId, bindings); + + try + { + // Wait for every pending TCS to complete or the caller's CT to fire. When the + // CT fires before all values arrive, fill the still-pending entries with a + // Bad-status snapshot rather than throwing — Read semantics let callers see + // partial results. + using var registration = cancellationToken.Register(() => + { + foreach (var tcs in pendingByRef.Values) + { + tcs.TrySetResult(new DataValueSnapshot( + Value: null, + StatusCode: 0x800B0000u, // BadTimeout + SourceTimestampUtc: null, + ServerTimestampUtc: DateTime.UtcNow)); + } + }); + + var snapshots = new DataValueSnapshot[fullReferences.Count]; + for (var i = 0; i < fullReferences.Count; i++) + { + snapshots[i] = await pendingByRef[fullReferences[i]].Task.ConfigureAwait(false); + } + return snapshots; + } + finally + { + pump.OnDataChange -= handler; + // Drop the bindings + unsubscribe the live handles. UnsubscribeBulkAsync's + // failure isn't fatal — the registry is already cleared, so any straggling + // event from the gw would be a no-op fan-out. + _subscriptions.Remove(subscriptionId); + var liveHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray(); + if (liveHandles.Length > 0) + { + try + { + await _subscriber!.UnsubscribeBulkAsync(liveHandles, CancellationToken.None) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "GalaxyDriver.ReadViaSubscribeOnceAsync UnsubscribeBulk failed for {Count} handle(s) — registry already cleared.", + liveHandles.Length); + } + } + } } // ===== IWritable (PR 4.3) ===== @@ -520,6 +702,7 @@ public sealed class GalaxyDriver if (_eventPump is not null) return _eventPump; _eventPump = new EventPump( _subscriber!, _subscriptions, _logger, + channelCapacity: _options.MxAccess.EventPumpChannelCapacity, clientName: _options.MxAccess.ClientName); _eventPump.OnDataChange += OnPumpDataChange; _eventPump.Start(); @@ -564,9 +747,7 @@ public sealed class GalaxyDriver var clientOptions = new MxGatewayClientOptions { Endpoint = new Uri(gw.Endpoint, UriKind.Absolute), - // PR 4.1 stub: ApiKeySecretRef is currently treated as the literal API key. - // PR 4.W (or a follow-up) wires up DPAPI-backed secret resolution. - ApiKey = gw.ApiKeySecretRef, + ApiKey = ResolveApiKey(gw.ApiKeySecretRef), UseTls = gw.UseTls, CaCertificatePath = gw.CaCertificatePath, ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds), diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriverFactoryExtensions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriverFactoryExtensions.cs index 0e0f6dc..4deece5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriverFactoryExtensions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriverFactoryExtensions.cs @@ -61,7 +61,8 @@ public static class GalaxyDriverFactoryExtensions ?? throw new InvalidOperationException( $"Galaxy driver '{driverInstanceId}' missing required MxAccess.ClientName"), PublishingIntervalMs: dto.MxAccess.PublishingIntervalMs ?? 1000, - WriteUserId: dto.MxAccess.WriteUserId ?? 0), + WriteUserId: dto.MxAccess.WriteUserId ?? 0, + EventPumpChannelCapacity: dto.MxAccess.EventPumpChannelCapacity ?? 50_000), Repository: new GalaxyRepositoryOptions( DiscoverPageSize: dto.Repository?.DiscoverPageSize ?? 5000, WatchDeployEvents: dto.Repository?.WatchDeployEvents ?? true), @@ -104,6 +105,7 @@ public static class GalaxyDriverFactoryExtensions public string? ClientName { get; init; } public int? PublishingIntervalMs { get; init; } public int? WriteUserId { get; init; } + public int? EventPumpChannelCapacity { get; init; } } internal sealed class RepositoryDto diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs index 42a5fa4..fe11b5d 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs @@ -18,7 +18,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; /// () both feed this aggregator; the /// consumes from /// IHostConnectivityProbe.GetHostStatuses() and re-raises -/// as the driver-level event in a follow-up PR. +/// as the driver-level event (wired in PR 4.W). /// public sealed class HostStatusAggregator { diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs index 46cf0fe..70c6c48 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs @@ -1,5 +1,6 @@ using MxGateway.Client; using MxGateway.Contracts.Proto; +// Use the generated nested status enum for the SetBufferedUpdateInterval reply check. namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; @@ -9,14 +10,16 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; /// gateway and streams MxEvents via the gw's bidirectional events RPC. /// /// -/// The gw's SubscribeBulkAsync doesn't currently take a buffered-update-interval -/// hint as a typed parameter — gw issue #102 / lmx_mxgw_impl.md gw-9 tracks adding -/// buffered_update_interval_ms. Until that lands, the parameter is captured here -/// and forwarded to SetBufferedUpdateInterval in a follow-up. PR 6.3 picks it up. +/// PR 6.3 wired the per-call buffered_update_interval_ms through +/// . The gw's contract is session-level +/// (SetBufferedUpdateInterval applies to all buffered subscriptions on the +/// server handle), so we cache the last-applied value and skip redundant calls. /// public sealed class GatewayGalaxySubscriber : IGalaxySubscriber { private readonly GalaxyMxSession _session; + private readonly Lock _intervalLock = new(); + private int _lastAppliedIntervalMs = -1; // -1 = never applied; 0 = explicit "use gw default" public GatewayGalaxySubscriber(GalaxyMxSession session) { @@ -31,14 +34,65 @@ public sealed class GatewayGalaxySubscriber : IGalaxySubscriber "GalaxyMxSession is not connected. Call ConnectAsync before subscribing."); var serverHandle = _session.ServerHandle; - // PR 6.3 wires bufferedUpdateIntervalMs to SetBufferedUpdateInterval; until then - // ignore it — values still arrive at the gw's default cadence. - _ = bufferedUpdateIntervalMs; + // The gw's SubscribeBulk RPC doesn't carry a per-call interval — buffered cadence + // is session-level, set via SetBufferedUpdateInterval. Apply it before the + // SubscribeBulk so the very first events on the new handles publish at the + // requested cadence. Skip when the last-applied value already matches. + if (bufferedUpdateIntervalMs > 0) + { + await EnsureSessionIntervalAsync(session, serverHandle, bufferedUpdateIntervalMs, cancellationToken) + .ConfigureAwait(false); + } return await session.SubscribeBulkAsync(serverHandle, fullReferences, cancellationToken) .ConfigureAwait(false); } + /// + /// Apply the gateway's session-level SetBufferedUpdateInterval command. The + /// gw's contract is "for this server handle, every buffered subscription publishes + /// at this cadence" — there's no per-handle granularity, so we cache the last + /// applied value and skip redundant calls. + /// + private async Task EnsureSessionIntervalAsync( + MxGateway.Client.MxGatewaySession session, int serverHandle, int intervalMs, CancellationToken cancellationToken) + { + lock (_intervalLock) + { + if (_lastAppliedIntervalMs == intervalMs) return; + } + + var reply = await session.InvokeAsync( + new MxCommandRequest + { + SessionId = session.SessionId, + ClientCorrelationId = Guid.NewGuid().ToString("N"), + Command = new MxCommand + { + Kind = MxCommandKind.SetBufferedUpdateInterval, + SetBufferedUpdateInterval = new SetBufferedUpdateIntervalCommand + { + ServerHandle = serverHandle, + UpdateIntervalMilliseconds = intervalMs, + }, + }, + }, + cancellationToken).ConfigureAwait(false); + + if (reply.ProtocolStatus?.Code is not (ProtocolStatusCode.Ok or ProtocolStatusCode.MxaccessFailure)) + { + // Don't throw on a soft failure — the SubscribeBulk will still succeed at the + // gw's default cadence, which is functional just not the requested cadence. + // The trace span (PR 6.1) plus the warning here gives ops the signal. + return; + } + + lock (_intervalLock) + { + _lastAppliedIntervalMs = intervalMs; + } + } + public async Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) { if (itemHandles.Count == 0) return; diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs new file mode 100644 index 0000000..21e491a --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverApiKeyResolverTests.cs @@ -0,0 +1,88 @@ +using Shouldly; +using Xunit; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests; + +/// +/// Follow-up #2 — pins the three resolution forms supported by +/// : env:NAME, file:PATH, +/// and the literal-string fallback. A future DPAPI arm slots in here without +/// touching the call site. +/// +public sealed class GalaxyDriverApiKeyResolverTests +{ + [Fact] + public void Literal_string_is_returned_unchanged() + { + GalaxyDriver.ResolveApiKey("plain-text-key").ShouldBe("plain-text-key"); + } + + [Fact] + public void Env_prefix_resolves_to_environment_variable() + { + const string name = "OTOPCUA_TEST_GALAXY_API_KEY"; + Environment.SetEnvironmentVariable(name, "key-from-env"); + try + { + GalaxyDriver.ResolveApiKey($"env:{name}").ShouldBe("key-from-env"); + } + finally + { + Environment.SetEnvironmentVariable(name, null); + } + } + + [Fact] + public void Env_prefix_unset_variable_throws_with_descriptive_message() + { + const string name = "OTOPCUA_TEST_GALAXY_API_KEY_UNSET"; + Environment.SetEnvironmentVariable(name, null); + + var ex = Should.Throw(() => + GalaxyDriver.ResolveApiKey($"env:{name}")); + ex.Message.ShouldContain(name); + ex.Message.ShouldContain("unset"); + } + + [Fact] + public void File_prefix_resolves_to_trimmed_file_contents() + { + var path = Path.Combine(Path.GetTempPath(), $"galaxy-key-{Guid.NewGuid():N}.txt"); + File.WriteAllText(path, " key-from-file \n"); + try + { + GalaxyDriver.ResolveApiKey($"file:{path}").ShouldBe("key-from-file"); + } + finally + { + File.Delete(path); + } + } + + [Fact] + public void File_prefix_missing_path_throws() + { + var path = Path.Combine(Path.GetTempPath(), $"does-not-exist-{Guid.NewGuid():N}.txt"); + var ex = Should.Throw(() => + GalaxyDriver.ResolveApiKey($"file:{path}")); + ex.Message.ShouldContain(path); + ex.Message.ShouldContain("doesn't exist"); + } + + [Fact] + public void File_prefix_empty_file_throws() + { + var path = Path.Combine(Path.GetTempPath(), $"galaxy-key-empty-{Guid.NewGuid():N}.txt"); + File.WriteAllText(path, " \n "); + try + { + var ex = Should.Throw(() => + GalaxyDriver.ResolveApiKey($"file:{path}")); + ex.Message.ShouldContain("empty"); + } + finally + { + File.Delete(path); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverReadTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverReadTests.cs index 18729e9..06e547c 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverReadTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverReadTests.cs @@ -66,13 +66,17 @@ public sealed class GalaxyDriverReadTests } [Fact] - public async Task ReadAsync_NoReader_Throws_PointingAtPR44() + public async Task ReadAsync_NoSeams_AndNoProductionRuntime_Throws() { + // Construction without seams + without InitializeAsync gives a driver where + // _dataReader and _subscriber are both null. The follow-up read path can't + // synthesise a Read without one, so it surfaces a NotSupportedException + // pointing at the misuse rather than NullRef'ing inside the pump path. var driver = new GalaxyDriver("g", Opts()); var ex = await Should.ThrowAsync(() => driver.ReadAsync(["x"], CancellationToken.None)); - ex.Message.ShouldContain("PR 4.4"); + ex.Message.ShouldContain("production runtime not built"); } [Fact] @@ -84,6 +88,48 @@ public sealed class GalaxyDriverReadTests driver.ReadAsync(["x"], CancellationToken.None)); } + [Fact] + public async Task ReadAsync_SubscribeOncePath_ResolvesFromFirstOnDataChange() + { + // Follow-up #1: when no test reader is injected but a subscriber IS, the driver + // synthesises a Read by subscribing, waiting for the first OnDataChange event + // per item handle (gw pushes initial value), then unsubscribing. + var subscriber = new GalaxyDriverSubscribeTests.FakeSubscriber(); + using var driver = new GalaxyDriver( + "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); + + var readTask = driver.ReadAsync(["Tank.Level"], CancellationToken.None); + // Push the "initial value" event the gw would emit immediately after SubscribeBulk. + await Task.Delay(50); // give SubscribeBulk a beat to register + handler to attach + var itemHandle = subscriber.Map["Tank.Level"]; + await subscriber.EmitOnDataChangeAsync(itemHandle, 42.0); + + var result = await readTask; + result.Count.ShouldBe(1); + result[0].Value.ShouldBe(42.0); + // Cleanup unsubscribed the live handle. + subscriber.UnsubscribedHandles.ShouldContain(itemHandle); + } + + [Fact] + public async Task ReadAsync_SubscribeOncePath_RejectedTagSurfacesAsBadStatus() + { + // gw rejects "Bad" at SubscribeBulk; the read path completes that slot with a + // Bad-status snapshot rather than waiting forever for an event that won't come. + var subscriber = new GalaxyDriverSubscribeTests.FakeSubscriber { Decide = tag => tag != "Bad" }; + using var driver = new GalaxyDriver( + "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); + + var readTask = driver.ReadAsync(["Good", "Bad"], CancellationToken.None); + await Task.Delay(50); + await subscriber.EmitOnDataChangeAsync(subscriber.Map["Good"], 1.0); + + var result = await readTask; + result.Count.ShouldBe(2); + result[0].Value.ShouldBe(1.0); + result[1].StatusCode.ShouldBe(0x80000000u); // Bad + } + [Fact] public async Task ReadAsync_PreservesReaderStatusCodes() { diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs index a5fb50f..3312522 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs @@ -23,7 +23,7 @@ public sealed class GalaxyDriverSubscribeTests new GalaxyRepositoryOptions(), new GalaxyReconnectOptions()); - private sealed class FakeSubscriber : IGalaxySubscriber + internal sealed class FakeSubscriber : IGalaxySubscriber { private int _nextHandle = 1; private readonly Channel _events = Channel.CreateUnbounded();