using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MxGateway.Client; using MxGateway.Contracts.Proto; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy; /// /// In-process .NET 10 Galaxy driver — the v2 replacement for the Galaxy.Host / /// Galaxy.Proxy pair. PR 4.0 ships the project skeleton with /// bodies that wire to a future IGalaxyGatewayClient abstraction. Capability /// interfaces (browse, read, write, subscribe, history routing, host probes) land in /// PRs 4.1–4.7; the wiring sequence keeps every intermediate state buildable so the /// Galaxy:Backend flag (PR 4.W) can flip between legacy-host and mxgateway /// for parity testing. /// /// /// This driver is registered as a Tier A in-process driver alongside Modbus / S7 / etc. /// The legacy GalaxyProxyDriver (Driver.Galaxy.Proxy) coexists until PR 7.2; /// registers under driver-type name /// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing. /// public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IDisposable { private readonly string _driverInstanceId; private readonly GalaxyDriverOptions _options; private readonly ILogger _logger; // PR 4.1 — IGalaxyHierarchySource is the test seam for browse. When null, the driver // lazily builds a GatewayGalaxyHierarchySource around a GalaxyRepositoryClient on // first DiscoverAsync. Tests inject a fake source via the internal ctor to exercise // GalaxyDiscoverer's translation logic without a real gRPC channel. private IGalaxyHierarchySource? _hierarchySource; private GalaxyRepositoryClient? _ownedRepositoryClient; // PR 4.2 — IGalaxyDataReader is the test seam for IReadable. PR 4.4 supplies the // production implementation that wraps GalaxyMxSession's SubscribeBulk + StreamEvents // pump; until then ReadAsync throws NotSupportedException when the reader is null // (legacy-host backend handles reads in production via DriverNodeManager's // capability-routing). private IGalaxyDataReader? _dataReader; // PR 4.3 — IGalaxyDataWriter is the test seam for IWritable. Production wraps // GalaxyMxSession via GatewayGalaxyDataWriter (Write / WriteSecured routing). The // per-tag SecurityClassification map is populated during ITagDiscovery and consumed // here at write time. private IGalaxyDataWriter? _dataWriter; private readonly System.Collections.Concurrent.ConcurrentDictionary _securityByFullRef = new(StringComparer.OrdinalIgnoreCase); // PR 4.4 — subscription lifecycle. The pump consumes the gw event stream and fans // out OnDataChange events to every registered driver subscription via the registry's // reverse map. The subscriber is the test seam — production uses // GatewayGalaxySubscriber over a connected GalaxyMxSession. private IGalaxySubscriber? _subscriber; private readonly SubscriptionRegistry _subscriptions = new(); private EventPump? _eventPump; private readonly Lock _pumpLock = new(); // PR 4.W — production runtime owned by InitializeAsync. The driver builds these // when it opens a real gw session; tests bypass them by injecting seams via the // internal ctor. private GalaxyMxSession? _ownedMxSession; private MxGatewayClient? _ownedMxClient; // PR 4.5 — reconnect supervisor. Reflects in DriverState.Degraded while not Healthy. private ReconnectSupervisor? _supervisor; // PR 4.6 — IRediscoverable plumbing. private DeployWatcher? _deployWatcher; // PR 4.7 — IHostConnectivityProbe plumbing. The aggregator owns the merged // transport+per-platform view; the forwarder is fed from the supervisor on // transport state transitions; the probe watcher subscribes ScanState attributes // for every discovered platform and pushes value changes to the aggregator. private readonly HostStatusAggregator _hostStatuses = new(); private HostConnectivityForwarder? _transportForwarder; private PerPlatformProbeWatcher? _probeWatcher; private DriverHealth _health = new(DriverState.Unknown, null, null); private bool _disposed; /// /// Server-pushed data-change notification. Fires from the /// 's background loop; handlers should be cheap (or queue /// onto another thread) to avoid blocking the gw event stream. /// public event EventHandler? OnDataChange; /// Fires when the gateway signals a deploy-time change (PR 4.6 DeployWatcher). public event EventHandler? OnRediscoveryNeeded; /// Fires when a host transitions Running ↔ Stopped (PR 4.7 HostStatusAggregator). public event EventHandler? OnHostStatusChanged; public GalaxyDriver( string driverInstanceId, GalaxyDriverOptions options, ILogger? logger = null) : this(driverInstanceId, options, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, logger) { } /// /// Test-visible ctor — inject custom seams so , /// , , and /// can be exercised against canned data without /// building real gRPC channels. /// internal GalaxyDriver( string driverInstanceId, GalaxyDriverOptions options, IGalaxyHierarchySource? hierarchySource, IGalaxyDataReader? dataReader = null, IGalaxyDataWriter? dataWriter = null, IGalaxySubscriber? subscriber = null, ILogger? logger = null) { _driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId) ? driverInstanceId : throw new ArgumentException("Driver instance id required.", nameof(driverInstanceId)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? NullLogger.Instance; _hierarchySource = hierarchySource; _dataReader = dataReader; _dataWriter = dataWriter; _subscriber = subscriber; // Forward the aggregator's transitions through IHostConnectivityProbe. _hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args); } /// public string DriverInstanceId => _driverInstanceId; /// public string DriverType => GalaxyDriverFactoryExtensions.DriverTypeName; /// Test-visible options snapshot. internal GalaxyDriverOptions Options => _options; /// public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { ObjectDisposedException.ThrowIf(_disposed, this); // Tests inject seams via the internal ctor; production InitializeAsync builds // the gateway client + session + per-capability runtime components from // GalaxyDriverOptions. When seams are pre-injected we leave them alone (the // test exercises the wired surface without a real gw round-trip). if (_subscriber is null && _dataWriter is null && _hierarchySource is null) { await BuildProductionRuntimeAsync(cancellationToken).ConfigureAwait(false); } else { _logger.LogDebug( "GalaxyDriver {InstanceId} initializing with pre-injected seams — production runtime build skipped", _driverInstanceId); } StartDeployWatcher(); _logger.LogInformation( "GalaxyDriver {InstanceId} initialized — endpoint={Endpoint} clientName={ClientName}", _driverInstanceId, _options.Gateway.Endpoint, _options.MxAccess.ClientName); _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); } /// /// Build the production gw client + session + per-capability runtime components /// from _options. Sets up the reconnect supervisor's reopen / replay /// callbacks so a transport drop replays every active subscription on the /// restored session. /// private async Task BuildProductionRuntimeAsync(CancellationToken cancellationToken) { var clientOptions = BuildClientOptions(_options.Gateway); _ownedMxClient = MxGatewayClient.Create(clientOptions); _ownedMxSession = new GalaxyMxSession(_options.MxAccess, _logger); await _ownedMxSession.ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false); // PR 6.1 — wrap the gw-facing seams in tracing decorators so every Subscribe / // Unsubscribe / Write / StreamEvents call emits a span on the // "ZB.MOM.WW.OtOpcUa.Driver.Galaxy" ActivitySource. The host process's tracing // listener (OTLP exporter, dotnet-trace, etc.) consumes these without the driver // taking a dependency on the OpenTelemetry packages. _subscriber = new TracedGalaxySubscriber( new GatewayGalaxySubscriber(_ownedMxSession), _options.MxAccess.ClientName); _dataWriter = new TracedGalaxyDataWriter( new GatewayGalaxyDataWriter(_ownedMxSession, _options.MxAccess.WriteUserId, _logger), _options.MxAccess.ClientName); _supervisor = new ReconnectSupervisor( reopen: ReopenAsync, replay: ReplayAsync, options: new ReconnectOptions( InitialBackoffOverride: TimeSpan.FromMilliseconds(_options.Reconnect.InitialBackoffMs), MaxBackoffOverride: TimeSpan.FromMilliseconds(_options.Reconnect.MaxBackoffMs)), logger: _logger); _transportForwarder = new HostConnectivityForwarder(_options.MxAccess.ClientName, _hostStatuses, _logger); _transportForwarder.SetTransport(HostState.Running); // initial state — we just connected _supervisor.StateChanged += OnSupervisorStateChanged; _probeWatcher = new PerPlatformProbeWatcher( _subscriber, _hostStatuses, _logger, bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs); } /// /// Reopen callback for : re-Register the gw session. /// If the session never connected, this is a fresh ConnectAsync; otherwise it's a /// reconnect against the existing client. /// private async Task ReopenAsync(CancellationToken cancellationToken) { if (_ownedMxSession is null) return; var clientOptions = BuildClientOptions(_options.Gateway); await _ownedMxSession.ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false); } /// /// Replay callback. Walks every active subscription's bindings and re-issues /// SubscribeBulk for the tag list. PR 6.x can swap this for the gw's batched /// ReplaySubscriptionsCommand once it ships. /// private async Task ReplayAsync(CancellationToken cancellationToken) { if (_subscriber is null) return; var bindings = _subscriptions.SnapshotAllBindings(); if (bindings.Count == 0) return; var refs = bindings.Select(b => b.FullReference).Distinct(StringComparer.OrdinalIgnoreCase).ToArray(); await _subscriber.SubscribeBulkAsync( refs, _options.MxAccess.PublishingIntervalMs, cancellationToken).ConfigureAwait(false); _logger.LogInformation( "GalaxyDriver {InstanceId} replay completed — {Count} tags re-subscribed", _driverInstanceId, refs.Length); } private void OnSupervisorStateChanged(object? sender, StateTransition transition) { // Reflect supervisor state in DriverHealth + transport forwarder. _health = transition.Next switch { ReconnectSupervisor.State.Healthy => new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null), _ => new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, transition.Cause), }; if (_transportForwarder is not null) { var hostState = transition.Next == ReconnectSupervisor.State.Healthy ? HostState.Running : HostState.Stopped; _transportForwarder.SetTransport(hostState); } } /// /// Resolves Gateway.ApiKeySecretRef to the actual API-key bytes. Three /// forms supported, evaluated in order: /// /// env:NAME — reads Environment.GetEnvironmentVariable(NAME). /// Throws when the variable is unset, so a misconfigured deployment fails /// fast at InitializeAsync rather than silently sending an empty key. /// file:PATH — reads UTF-8 text from PATH, trimming /// whitespace. Lets operators stash the key in an ACL'd file outside the /// repo (the same pattern as the legacy .local/galaxy-host-secret.txt). /// Anything else — used as the literal API key. Convenient for dev, /// and avoids breaking existing configs that pre-date this resolver. /// /// A future PR can swap any of these arms for a DPAPI-backed lookup without /// changing the call site. /// internal static string ResolveApiKey(string secretRef) { ArgumentException.ThrowIfNullOrEmpty(secretRef); if (secretRef.StartsWith("env:", StringComparison.OrdinalIgnoreCase)) { var name = secretRef[4..]; var value = Environment.GetEnvironmentVariable(name); return !string.IsNullOrEmpty(value) ? value : throw new InvalidOperationException( $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' resolves to env var '{name}', but it is unset."); } if (secretRef.StartsWith("file:", StringComparison.OrdinalIgnoreCase)) { var path = secretRef[5..]; if (!File.Exists(path)) { throw new InvalidOperationException( $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' points at '{path}', which doesn't exist."); } var contents = File.ReadAllText(path).Trim(); return !string.IsNullOrEmpty(contents) ? contents : throw new InvalidOperationException( $"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' file '{path}' is empty."); } return secretRef; } private static MxGatewayClientOptions BuildClientOptions(GalaxyGatewayOptions gw) => new() { Endpoint = new Uri(gw.Endpoint, UriKind.Absolute), ApiKey = ResolveApiKey(gw.ApiKeySecretRef), UseTls = gw.UseTls, CaCertificatePath = gw.CaCertificatePath, ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds), DefaultCallTimeout = TimeSpan.FromSeconds(gw.DefaultCallTimeoutSeconds), StreamTimeout = gw.StreamTimeoutSeconds > 0 ? TimeSpan.FromSeconds(gw.StreamTimeoutSeconds) : null, }; private void StartDeployWatcher() { if (!_options.Repository.WatchDeployEvents) return; if (_ownedRepositoryClient is null && _hierarchySource is null) return; // Reuse the lazily-built repository client (DiscoverAsync constructs it on demand). // If discovery hasn't run yet, build the client here so the watcher has a target. if (_ownedRepositoryClient is null) { _ownedRepositoryClient = MxGateway.Client.GalaxyRepositoryClient.Create( BuildClientOptions(_options.Gateway)); } var source = new GatewayGalaxyDeployWatchSource(_ownedRepositoryClient); _deployWatcher = new DeployWatcher(source, _logger); _deployWatcher.OnRediscoveryNeeded += (_, args) => OnRediscoveryNeeded?.Invoke(this, args); _ = _deployWatcher.StartAsync(CancellationToken.None); } /// public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { // In-place config reapply. PR 4.5's reconnect supervisor will swap the // gateway-client options under the lock; for the skeleton we just refresh health. ObjectDisposedException.ThrowIf(_disposed, this); _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); return Task.CompletedTask; } /// public Task ShutdownAsync(CancellationToken cancellationToken) { if (_disposed) return Task.CompletedTask; _logger.LogInformation("GalaxyDriver {InstanceId} shutting down", _driverInstanceId); _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); return Task.CompletedTask; } /// public DriverHealth GetHealth() { // Reconnect supervisor wins when degraded — the cached _health reflects the last // successful operation, but ongoing recovery should surface as Degraded. if (_supervisor?.IsDegraded == true) { return new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, _supervisor.LastError); } return _health; } // ===== IHostConnectivityProbe (PR 4.7 wire-up) ===== /// public IReadOnlyList GetHostStatuses() => _hostStatuses.Snapshot(); /// public long GetMemoryFootprint() => 0; // PR 4.4 sets this from SubscriptionRegistry size. /// public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; // ===== ITagDiscovery (PR 4.1) ===== /// public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { ObjectDisposedException.ThrowIf(_disposed, this); ArgumentNullException.ThrowIfNull(builder); // PR 4.3 — capture SecurityClassification per attribute. PR 4.W — also refresh // the per-platform probe watcher's membership after discovery so newly-added // $WinPlatform / $AppEngine objects start advising their ScanState attribute. var capturingBuilder = new SecurityCapturingBuilder(builder, _securityByFullRef); var source = _hierarchySource ??= BuildDefaultHierarchySource(); var discoverer = new GalaxyDiscoverer(source); await discoverer.DiscoverAsync(capturingBuilder, cancellationToken).ConfigureAwait(false); if (_probeWatcher is not null) { var hierarchy = await source.GetHierarchyAsync(cancellationToken).ConfigureAwait(false); var platforms = hierarchy .Where(o => o.TemplateChain.Any(t => string.Equals(t, "$WinPlatform", StringComparison.OrdinalIgnoreCase) || string.Equals(t, "$AppEngine", StringComparison.OrdinalIgnoreCase))) .Select(o => o.TagName) .Where(name => !string.IsNullOrEmpty(name)); await _probeWatcher.SyncPlatformsAsync(platforms, cancellationToken).ConfigureAwait(false); } } private SecurityClassification ResolveSecurity(string fullReference) => _securityByFullRef.TryGetValue(fullReference, out var sec) ? sec : SecurityClassification.FreeAccess; // ===== IReadable ===== /// public Task> ReadAsync( IReadOnlyList fullReferences, CancellationToken cancellationToken) { ObjectDisposedException.ThrowIf(_disposed, this); ArgumentNullException.ThrowIfNull(fullReferences); if (fullReferences.Count == 0) return Task.FromResult>([]); if (_dataReader is not null) { // Test-only path — tests inject a canned reader via the internal ctor. return _dataReader.ReadAsync(fullReferences, cancellationToken); } if (_subscriber is null) { throw new NotSupportedException( "GalaxyDriver.ReadAsync requires a connected GalaxyMxSession (production runtime not built). " + "Either inject a test seam via the internal ctor or call InitializeAsync against a real gateway."); } return ReadViaSubscribeOnceAsync(fullReferences, cancellationToken); } /// /// Production read path. MxAccess has no one-shot Read RPC — every value comes /// through the event stream. We synthesise a Read by: /// /// Subscribing the requested tags through the existing /// + . /// Waiting for the first OnDataChange per item handle (the gateway /// pushes the current value as the initial event after a SubscribeBulk). /// Unsubscribing. /// /// Tags the gw rejects at SubscribeBulk time, or that never publish before the /// caller's cancellation token fires, return a Bad-status snapshot in input order /// so the caller still sees one snapshot per requested reference. /// private async Task> ReadViaSubscribeOnceAsync( IReadOnlyList fullReferences, CancellationToken cancellationToken) { var pump = EnsureEventPumpStarted(); var subscriptionId = _subscriptions.NextSubscriptionId(); // Pre-allocate one TaskCompletionSource per full-reference so the OnDataChange // handler can complete them out-of-order as events arrive. Wired BEFORE the // SubscribeBulk call so we don't race with the first event the gw pushes. var pendingByRef = new Dictionary>( StringComparer.OrdinalIgnoreCase); foreach (var fullRef in fullReferences.Distinct(StringComparer.OrdinalIgnoreCase)) { pendingByRef[fullRef] = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); } EventHandler handler = (_, args) => { // Filter to OUR subscription — the pump's OnDataChange fans out across all // subscriptions on the driver, and we don't want a parallel ISubscribable // caller's events to leak into our read. if (args.SubscriptionHandle is GalaxySubscriptionHandle gsh && gsh.SubscriptionId == subscriptionId && pendingByRef.TryGetValue(args.FullReference, out var tcs)) { tcs.TrySetResult(args.Snapshot); } }; pump.OnDataChange += handler; var bufferedIntervalMs = _options.MxAccess.PublishingIntervalMs; IReadOnlyList results; try { results = await _subscriber! .SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken) .ConfigureAwait(false); } catch { pump.OnDataChange -= handler; throw; } // Register bindings so the pump knows to dispatch events for these handles. var bindings = new List(fullReferences.Count); for (var i = 0; i < fullReferences.Count; i++) { var fullRef = fullReferences[i]; var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase)); var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0; bindings.Add(new TagBinding(fullRef, itemHandle)); // Tags the gw rejected up front — complete with Bad status now so the // wait below doesn't time out on them. if (itemHandle <= 0 && pendingByRef.TryGetValue(fullRef, out var rejectedTcs)) { rejectedTcs.TrySetResult(new DataValueSnapshot( Value: null, StatusCode: 0x80000000u, // Bad SourceTimestampUtc: null, ServerTimestampUtc: DateTime.UtcNow)); } } _subscriptions.Register(subscriptionId, bindings); try { // Wait for every pending TCS to complete or the caller's CT to fire. When the // CT fires before all values arrive, fill the still-pending entries with a // Bad-status snapshot rather than throwing — Read semantics let callers see // partial results. using var registration = cancellationToken.Register(() => { foreach (var tcs in pendingByRef.Values) { tcs.TrySetResult(new DataValueSnapshot( Value: null, StatusCode: 0x800B0000u, // BadTimeout SourceTimestampUtc: null, ServerTimestampUtc: DateTime.UtcNow)); } }); var snapshots = new DataValueSnapshot[fullReferences.Count]; for (var i = 0; i < fullReferences.Count; i++) { snapshots[i] = await pendingByRef[fullReferences[i]].Task.ConfigureAwait(false); } return snapshots; } finally { pump.OnDataChange -= handler; // Drop the bindings + unsubscribe the live handles. UnsubscribeBulkAsync's // failure isn't fatal — the registry is already cleared, so any straggling // event from the gw would be a no-op fan-out. _subscriptions.Remove(subscriptionId); var liveHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray(); if (liveHandles.Length > 0) { try { await _subscriber!.UnsubscribeBulkAsync(liveHandles, CancellationToken.None) .ConfigureAwait(false); } catch (Exception ex) { _logger.LogWarning(ex, "GalaxyDriver.ReadViaSubscribeOnceAsync UnsubscribeBulk failed for {Count} handle(s) — registry already cleared.", liveHandles.Length); } } } } // ===== IWritable (PR 4.3) ===== /// public Task> WriteAsync( IReadOnlyList writes, CancellationToken cancellationToken) { ObjectDisposedException.ThrowIf(_disposed, this); ArgumentNullException.ThrowIfNull(writes); if (writes.Count == 0) return Task.FromResult>([]); if (_dataWriter is null) { // Mirror the IReadable fallback: production write path runs on top of // GalaxyMxSession (PR 4.2 skeleton; PR 4.4 wires the live session). Until // that lands, deployments selecting Galaxy:Backend=mxgateway can't write. throw new NotSupportedException( "GalaxyDriver.WriteAsync requires GatewayGalaxyDataWriter wired against a connected " + "GalaxyMxSession (PR 4.4). Until that lands, route writes through the legacy-host " + "backend (Galaxy:Backend=legacy-host)."); } return _dataWriter.WriteAsync(writes, ResolveSecurity, cancellationToken); } // ===== ISubscribable (PR 4.4) ===== /// public async Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) { ObjectDisposedException.ThrowIf(_disposed, this); ArgumentNullException.ThrowIfNull(fullReferences); if (_subscriber is null) { throw new NotSupportedException( "GalaxyDriver.SubscribeAsync requires a connected GalaxyMxSession + GatewayGalaxySubscriber. " + "PR 4.W wires the production session; until then route subscriptions through the legacy-host backend."); } var pump = EnsureEventPumpStarted(); var subscriptionId = _subscriptions.NextSubscriptionId(); if (fullReferences.Count == 0) { // Empty subscriptions register but never bind anything — keeps Unsubscribe // symmetric for callers that conditionally add tags later. _subscriptions.Register(subscriptionId, []); return new GalaxySubscriptionHandle(subscriptionId); } // PR 6.3 — when the caller doesn't set a publishing interval (TimeSpan.Zero or // negative), fall back to the configured MxAccess.PublishingIntervalMs. The // server's UA subscription publishingInterval drives this in production; tests // and infrastructure callers (probe watcher, deploy watcher) hit the fallback. var requested = (int)Math.Max(0, publishingInterval.TotalMilliseconds); var bufferedIntervalMs = requested > 0 ? requested : _options.MxAccess.PublishingIntervalMs; var results = await _subscriber .SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken) .ConfigureAwait(false); // Build the binding list in input order. Failed entries (gw rejected the tag) are // recorded with a non-positive ItemHandle so the caller can detect partial failure // by inspecting the returned handle's diagnostic context — full per-tag error // surface lands in PR 5.3's parity tests. var bindings = new List(fullReferences.Count); for (var i = 0; i < fullReferences.Count; i++) { var fullRef = fullReferences[i]; var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase)); var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0; bindings.Add(new TagBinding(fullRef, itemHandle)); if (match is null || !match.WasSuccessful) { _logger.LogWarning( "Galaxy subscribe for {FullRef} failed: {Error}", fullRef, match?.ErrorMessage ?? ""); } } _subscriptions.Register(subscriptionId, bindings); _ = pump; // keep the pump alive for the subscription's lifetime return new GalaxySubscriptionHandle(subscriptionId); } /// public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) { ObjectDisposedException.ThrowIf(_disposed, this); ArgumentNullException.ThrowIfNull(handle); if (handle is not GalaxySubscriptionHandle gsh) { throw new ArgumentException( $"Subscription handle was not issued by this driver (expected GalaxySubscriptionHandle, got {handle.GetType().Name}).", nameof(handle)); } var bindings = _subscriptions.Remove(gsh.SubscriptionId); if (bindings is null) return; // already removed or never registered var liveItemHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray(); if (liveItemHandles.Length == 0 || _subscriber is null) return; try { await _subscriber.UnsubscribeBulkAsync(liveItemHandles, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { _logger.LogWarning(ex, "Galaxy UnsubscribeBulk failed for subscription {SubscriptionId} — registry already cleared on driver side.", gsh.SubscriptionId); } } /// /// Lazily start the on the first subscribe. The pump is /// shared across every subscription on this driver — fan-out happens through the /// reverse map, not by spinning a pump per /// subscription. /// private EventPump EnsureEventPumpStarted() { lock (_pumpLock) { if (_eventPump is not null) return _eventPump; _eventPump = new EventPump( _subscriber!, _subscriptions, _logger, channelCapacity: _options.MxAccess.EventPumpChannelCapacity, clientName: _options.MxAccess.ClientName); _eventPump.OnDataChange += OnPumpDataChange; _eventPump.Start(); return _eventPump; } } /// /// Forwards every fan-out event to the public for /// ISubscribable consumers, AND routes ScanState changes to the per-platform /// probe watcher (PR 4.7) so platform health entries update without the watcher /// consuming the event stream itself. /// private void OnPumpDataChange(object? sender, DataChangeEventArgs args) { OnDataChange?.Invoke(this, args); if (_probeWatcher is not null && args.FullReference.EndsWith(PerPlatformProbeWatcher.ProbeSuffix, StringComparison.OrdinalIgnoreCase)) { // The probe decoder takes a raw quality byte; recover it from the StatusCode // top byte (Good=0x00 → byte 192, Uncertain=0x40 → byte 64, Bad=0x80 → byte 0). var qualityByte = (byte)((args.Snapshot.StatusCode >> 30) & 0x3) switch { 0 => 192, 1 => 64, _ => 0, }; _probeWatcher.OnProbeValueChanged(args.FullReference, args.Snapshot.Value, (byte)qualityByte); } } /// /// Lazily builds the default from /// _options.Gateway. Owned is disposed in /// . Tests bypass this by injecting their own source via the /// internal ctor. /// private IGalaxyHierarchySource BuildDefaultHierarchySource() { var gw = _options.Gateway; var clientOptions = new MxGatewayClientOptions { Endpoint = new Uri(gw.Endpoint, UriKind.Absolute), ApiKey = ResolveApiKey(gw.ApiKeySecretRef), UseTls = gw.UseTls, CaCertificatePath = gw.CaCertificatePath, ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds), DefaultCallTimeout = TimeSpan.FromSeconds(gw.DefaultCallTimeoutSeconds), StreamTimeout = gw.StreamTimeoutSeconds > 0 ? TimeSpan.FromSeconds(gw.StreamTimeoutSeconds) : null, }; _ownedRepositoryClient = GalaxyRepositoryClient.Create(clientOptions); return new TracedGalaxyHierarchySource( new GatewayGalaxyHierarchySource(_ownedRepositoryClient), _options.MxAccess.ClientName); } public void Dispose() { if (_disposed) return; _disposed = true; // Order: stop deploy watcher, supervisor, probe watcher, pump, then sessions and // clients. Each step is best-effort — disposal during a faulted state shouldn't // throw and prevent the rest of the cleanup. try { _deployWatcher?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "DeployWatcher dispose failed"); } try { _supervisor?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "ReconnectSupervisor dispose failed"); } try { _probeWatcher?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "ProbeWatcher dispose failed"); } try { _transportForwarder?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "Transport forwarder dispose failed"); } EventPump? pump; lock (_pumpLock) { pump = _eventPump; _eventPump = null; } pump?.DisposeAsync().AsTask().GetAwaiter().GetResult(); _ownedMxSession?.DisposeAsync().AsTask().GetAwaiter().GetResult(); _ownedMxSession = null; _ownedMxClient?.DisposeAsync().AsTask().GetAwaiter().GetResult(); _ownedMxClient = null; _ownedRepositoryClient?.DisposeAsync().AsTask().GetAwaiter().GetResult(); _ownedRepositoryClient = null; _hierarchySource = null; } /// /// Address-space builder wrapper that records each variable's /// into the supplied dictionary /// before delegating to the inner builder. Used by /// to capture per-tag classifications for the IWritable routing decision — /// PR 4.3 needs the data, but the discoverer itself doesn't (and shouldn't) /// know about the driver's internal state. /// private sealed class SecurityCapturingBuilder( IAddressSpaceBuilder inner, System.Collections.Concurrent.ConcurrentDictionary map) : IAddressSpaceBuilder { public IAddressSpaceBuilder Folder(string browseName, string displayName) => new SecurityCapturingBuilder(inner.Folder(browseName, displayName), map); public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo attributeInfo) { map[attributeInfo.FullName] = attributeInfo.SecurityClass; return inner.Variable(browseName, displayName, attributeInfo); } public void AddProperty(string browseName, DriverDataType dataType, object? value) => inner.AddProperty(browseName, dataType, value); } }