Files
lmxopcua/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
Joseph Doherty 4df8737c86 fix(driver-galaxy): wire event-stream faults to the reconnect supervisor (Driver.Galaxy-001)
The ReconnectSupervisor was constructed but its trigger
ReportTransportFailure was never called. When the gateway StreamEvents
stream faulted, EventPump just logged and exited — the supervisor was
never notified, so a transient gateway drop permanently stopped
data-change notifications while GetHealth() still reported Healthy.

EventPump gains an optional onStreamFault callback invoked from its
stream-fault catch block (not on clean shutdown). GalaxyDriver wires it
to ReconnectSupervisor.ReportTransportFailure so a transport drop drives
reopen → replay.

This is the minimal fix for -001; the pump-restart-on-reopen gap remains
tracked as Driver.Galaxy-008. Regression tests cover the callback being
invoked on fault, the end-to-end supervisor reopen/replay, and that a
clean shutdown does not fire it. Driver.Galaxy suite: 206/206 pass.

Resolves code-review finding Driver.Galaxy-001 (Critical).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 05:54:33 -04:00

1026 lines
48 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy;
/// <summary>
/// In-process .NET 10 Galaxy driver — the v2 replacement for the Galaxy.Host /
/// Galaxy.Proxy pair. PR 4.0 ships the project skeleton with <see cref="IDriver"/>
/// bodies that wire to a future <c>IGalaxyGatewayClient</c> abstraction. Capability
/// interfaces (browse, read, write, subscribe, history routing, host probes) land in
/// PRs 4.14.7; the wiring sequence keeps every intermediate state buildable so the
/// <c>Galaxy:Backend</c> flag (PR 4.W) can flip between legacy-host and mxgateway
/// for parity testing.
/// </summary>
/// <remarks>
/// This driver is registered as a Tier A in-process driver alongside Modbus / S7 / etc.
/// The legacy <c>GalaxyProxyDriver</c> (Driver.Galaxy.Proxy) coexists until PR 7.2;
/// <see cref="GalaxyDriverFactoryExtensions"/> registers under driver-type name
/// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing.
/// </remarks>
public sealed class GalaxyDriver
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable
{
private readonly string _driverInstanceId;
private readonly GalaxyDriverOptions _options;
private readonly ILogger<GalaxyDriver> _logger;
// PR 4.1 — IGalaxyHierarchySource is the test seam for browse. When null, the driver
// lazily builds a GatewayGalaxyHierarchySource around a GalaxyRepositoryClient on
// first DiscoverAsync. Tests inject a fake source via the internal ctor to exercise
// GalaxyDiscoverer's translation logic without a real gRPC channel.
private IGalaxyHierarchySource? _hierarchySource;
private GalaxyRepositoryClient? _ownedRepositoryClient;
// PR 4.2 — IGalaxyDataReader is the test seam for IReadable. PR 4.4 supplies the
// production implementation that wraps GalaxyMxSession's SubscribeBulk + StreamEvents
// pump; until then ReadAsync throws NotSupportedException when the reader is null
// (legacy-host backend handles reads in production via DriverNodeManager's
// capability-routing).
private IGalaxyDataReader? _dataReader;
// PR 4.3 — IGalaxyDataWriter is the test seam for IWritable. Production wraps
// GalaxyMxSession via GatewayGalaxyDataWriter (Write / WriteSecured routing). The
// per-tag SecurityClassification map is populated during ITagDiscovery and consumed
// here at write time.
private IGalaxyDataWriter? _dataWriter;
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, SecurityClassification>
_securityByFullRef = new(StringComparer.OrdinalIgnoreCase);
// PR 4.4 — subscription lifecycle. The pump consumes the gw event stream and fans
// out OnDataChange events to every registered driver subscription via the registry's
// reverse map. The subscriber is the test seam — production uses
// GatewayGalaxySubscriber over a connected GalaxyMxSession.
private IGalaxySubscriber? _subscriber;
private readonly SubscriptionRegistry _subscriptions = new();
private EventPump? _eventPump;
private readonly Lock _pumpLock = new();
// IAlarmSource implementation. Production-side acks route through
// GatewayGalaxyAlarmAcknowledger which calls the session-less
// MxGatewayClient.AcknowledgeAlarmAsync RPC; alarm transitions arrive on the
// gateway's session-less StreamAlarms feed via GatewayGalaxyAlarmFeed. Tests inject
// IGalaxyAlarmAcknowledger + IGalaxyAlarmFeed via the internal ctor to exercise the
// wiring without a running gateway. This driver bridges the feed's OnAlarmTransition
// onto IAlarmSource.OnAlarmEvent.
private IGalaxyAlarmAcknowledger? _alarmAcknowledger;
private IGalaxyAlarmFeed? _alarmFeed;
private readonly Lock _alarmHandlersLock = new();
private readonly Lock _alarmFeedLock = new();
private bool _alarmFeedWired;
private readonly HashSet<GalaxyAlarmSubscriptionHandle> _alarmSubscriptions = new();
// PR 4.W — production runtime owned by InitializeAsync. The driver builds these
// when it opens a real gw session; tests bypass them by injecting seams via the
// internal ctor.
private GalaxyMxSession? _ownedMxSession;
private MxGatewayClient? _ownedMxClient;
// PR 4.5 — reconnect supervisor. Reflects in DriverState.Degraded while not Healthy.
private ReconnectSupervisor? _supervisor;
// PR 4.6 — IRediscoverable plumbing.
private DeployWatcher? _deployWatcher;
// PR 4.7 — IHostConnectivityProbe plumbing. The aggregator owns the merged
// transport+per-platform view; the forwarder is fed from the supervisor on
// transport state transitions; the probe watcher subscribes ScanState attributes
// for every discovered platform and pushes value changes to the aggregator.
private readonly HostStatusAggregator _hostStatuses = new();
private HostConnectivityForwarder? _transportForwarder;
private PerPlatformProbeWatcher? _probeWatcher;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private bool _disposed;
/// <summary>
/// Server-pushed data-change notification. Fires from the
/// <see cref="EventPump"/>'s background loop; handlers should be cheap (or queue
/// onto another thread) to avoid blocking the gw event stream.
/// </summary>
public event EventHandler<DataChangeEventArgs>? OnDataChange;
/// <summary>Fires when the gateway signals a deploy-time change (PR 4.6 DeployWatcher).</summary>
public event EventHandler<RediscoveryEventArgs>? OnRediscoveryNeeded;
/// <summary>Fires when a host transitions Running ↔ Stopped (PR 4.7 HostStatusAggregator).</summary>
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
/// <inheritdoc />
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
public GalaxyDriver(
string driverInstanceId,
GalaxyDriverOptions options,
ILogger<GalaxyDriver>? logger = null)
: this(driverInstanceId, options,
hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null,
alarmAcknowledger: null, alarmFeed: null, logger)
{
}
/// <summary>
/// Test-visible ctor — inject custom seams so <see cref="DiscoverAsync"/>,
/// <see cref="ReadAsync"/>, <see cref="WriteAsync"/>, and
/// <see cref="SubscribeAsync"/> can be exercised against canned data without
/// building real gRPC channels.
/// </summary>
internal GalaxyDriver(
string driverInstanceId,
GalaxyDriverOptions options,
IGalaxyHierarchySource? hierarchySource,
IGalaxyDataReader? dataReader = null,
IGalaxyDataWriter? dataWriter = null,
IGalaxySubscriber? subscriber = null,
IGalaxyAlarmAcknowledger? alarmAcknowledger = null,
IGalaxyAlarmFeed? alarmFeed = null,
ILogger<GalaxyDriver>? logger = null)
{
_driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId)
? driverInstanceId
: throw new ArgumentException("Driver instance id required.", nameof(driverInstanceId));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? NullLogger<GalaxyDriver>.Instance;
_hierarchySource = hierarchySource;
_dataReader = dataReader;
_dataWriter = dataWriter;
_subscriber = subscriber;
_alarmAcknowledger = alarmAcknowledger;
_alarmFeed = alarmFeed;
// Forward the aggregator's transitions through IHostConnectivityProbe.
_hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args);
}
/// <inheritdoc />
public string DriverInstanceId => _driverInstanceId;
/// <inheritdoc />
public string DriverType => GalaxyDriverFactoryExtensions.DriverTypeName;
/// <summary>Test-visible options snapshot.</summary>
internal GalaxyDriverOptions Options => _options;
/// <inheritdoc />
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
// Tests inject seams via the internal ctor; production InitializeAsync builds
// the gateway client + session + per-capability runtime components from
// GalaxyDriverOptions. When seams are pre-injected we leave them alone (the
// test exercises the wired surface without a real gw round-trip).
if (_subscriber is null && _dataWriter is null && _hierarchySource is null)
{
await BuildProductionRuntimeAsync(cancellationToken).ConfigureAwait(false);
}
else
{
_logger.LogDebug(
"GalaxyDriver {InstanceId} initializing with pre-injected seams — production runtime build skipped",
_driverInstanceId);
}
StartDeployWatcher();
_logger.LogInformation(
"GalaxyDriver {InstanceId} initialized — endpoint={Endpoint} clientName={ClientName}",
_driverInstanceId, _options.Gateway.Endpoint, _options.MxAccess.ClientName);
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
/// <summary>
/// Build the production gw client + session + per-capability runtime components
/// from <c>_options</c>. Sets up the reconnect supervisor's reopen / replay
/// callbacks so a transport drop replays every active subscription on the
/// restored session.
/// </summary>
private async Task BuildProductionRuntimeAsync(CancellationToken cancellationToken)
{
var clientOptions = BuildClientOptions(_options.Gateway);
_ownedMxClient = MxGatewayClient.Create(clientOptions);
_ownedMxSession = new GalaxyMxSession(_options.MxAccess, _logger);
await _ownedMxSession.ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false);
// PR 6.1 — wrap the gw-facing seams in tracing decorators so every Subscribe /
// Unsubscribe / Write / StreamEvents call emits a span on the
// "ZB.MOM.WW.OtOpcUa.Driver.Galaxy" ActivitySource. The host process's tracing
// listener (OTLP exporter, dotnet-trace, etc.) consumes these without the driver
// taking a dependency on the OpenTelemetry packages.
_subscriber = new TracedGalaxySubscriber(
new GatewayGalaxySubscriber(_ownedMxSession), _options.MxAccess.ClientName);
_dataWriter = new TracedGalaxyDataWriter(
new GatewayGalaxyDataWriter(_ownedMxSession, _options.MxAccess.WriteUserId, _logger),
_options.MxAccess.ClientName);
_supervisor = new ReconnectSupervisor(
reopen: ReopenAsync,
replay: ReplayAsync,
options: new ReconnectOptions(
InitialBackoffOverride: TimeSpan.FromMilliseconds(_options.Reconnect.InitialBackoffMs),
MaxBackoffOverride: TimeSpan.FromMilliseconds(_options.Reconnect.MaxBackoffMs)),
logger: _logger);
_transportForwarder = new HostConnectivityForwarder(_options.MxAccess.ClientName, _hostStatuses, _logger);
_transportForwarder.SetTransport(HostState.Running); // initial state — we just connected
_supervisor.StateChanged += OnSupervisorStateChanged;
_probeWatcher = new PerPlatformProbeWatcher(
_subscriber, _hostStatuses, _logger,
bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs);
// Wire the alarm acknowledger + feed to the live gateway client. Both are
// session-less — the gateway serves alarms from an always-on central monitor —
// so they hang off the owned MxGatewayClient, not the worker session.
_alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _logger);
_alarmFeed ??= new GatewayGalaxyAlarmFeed(
_ownedMxClient.StreamAlarmsAsync, _logger, _options.MxAccess.ClientName);
}
/// <summary>
/// Reopen callback for <see cref="ReconnectSupervisor"/>: re-Register the gw session.
/// If the session never connected, this is a fresh ConnectAsync; otherwise it's a
/// reconnect against the existing client.
/// </summary>
private async Task ReopenAsync(CancellationToken cancellationToken)
{
if (_ownedMxSession is null) return;
var clientOptions = BuildClientOptions(_options.Gateway);
await _ownedMxSession.ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Replay callback. Walks every active subscription's bindings and re-issues
/// SubscribeBulk for the tag list. PR 6.x can swap this for the gw's batched
/// <c>ReplaySubscriptionsCommand</c> once it ships.
/// </summary>
private async Task ReplayAsync(CancellationToken cancellationToken)
{
if (_subscriber is null) return;
var bindings = _subscriptions.SnapshotAllBindings();
if (bindings.Count == 0) return;
var refs = bindings.Select(b => b.FullReference).Distinct(StringComparer.OrdinalIgnoreCase).ToArray();
await _subscriber.SubscribeBulkAsync(
refs, _options.MxAccess.PublishingIntervalMs, cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"GalaxyDriver {InstanceId} replay completed — {Count} tags re-subscribed",
_driverInstanceId, refs.Length);
}
private void OnSupervisorStateChanged(object? sender, StateTransition transition)
{
// Reflect supervisor state in DriverHealth + transport forwarder.
_health = transition.Next switch
{
ReconnectSupervisor.State.Healthy => new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null),
_ => new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, transition.Cause),
};
if (_transportForwarder is not null)
{
var hostState = transition.Next == ReconnectSupervisor.State.Healthy
? HostState.Running
: HostState.Stopped;
_transportForwarder.SetTransport(hostState);
}
}
/// <summary>
/// Resolves <c>Gateway.ApiKeySecretRef</c> to the actual API-key bytes. Three
/// forms supported, evaluated in order:
/// <list type="number">
/// <item><c>env:NAME</c> — reads <c>Environment.GetEnvironmentVariable(NAME)</c>.
/// Throws when the variable is unset, so a misconfigured deployment fails
/// fast at InitializeAsync rather than silently sending an empty key.</item>
/// <item><c>file:PATH</c> — reads UTF-8 text from <c>PATH</c>, trimming
/// whitespace. Lets operators stash the key in an ACL'd file outside the
/// repo (the same pattern as the legacy <c>.local/galaxy-host-secret.txt</c>).</item>
/// <item>Anything else — used as the literal API key. Convenient for dev,
/// and avoids breaking existing configs that pre-date this resolver.</item>
/// </list>
/// A future PR can swap any of these arms for a DPAPI-backed lookup without
/// changing the call site.
/// </summary>
internal static string ResolveApiKey(string secretRef)
{
ArgumentException.ThrowIfNullOrEmpty(secretRef);
if (secretRef.StartsWith("env:", StringComparison.OrdinalIgnoreCase))
{
var name = secretRef[4..];
var value = Environment.GetEnvironmentVariable(name);
return !string.IsNullOrEmpty(value)
? value
: throw new InvalidOperationException(
$"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' resolves to env var '{name}', but it is unset.");
}
if (secretRef.StartsWith("file:", StringComparison.OrdinalIgnoreCase))
{
var path = secretRef[5..];
if (!File.Exists(path))
{
throw new InvalidOperationException(
$"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' points at '{path}', which doesn't exist.");
}
var contents = File.ReadAllText(path).Trim();
return !string.IsNullOrEmpty(contents)
? contents
: throw new InvalidOperationException(
$"Galaxy.Gateway.ApiKeySecretRef='{secretRef}' file '{path}' is empty.");
}
return secretRef;
}
private static MxGatewayClientOptions BuildClientOptions(GalaxyGatewayOptions gw) => new()
{
Endpoint = new Uri(gw.Endpoint, UriKind.Absolute),
ApiKey = ResolveApiKey(gw.ApiKeySecretRef),
UseTls = gw.UseTls,
CaCertificatePath = gw.CaCertificatePath,
ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds),
DefaultCallTimeout = TimeSpan.FromSeconds(gw.DefaultCallTimeoutSeconds),
StreamTimeout = gw.StreamTimeoutSeconds > 0 ? TimeSpan.FromSeconds(gw.StreamTimeoutSeconds) : null,
};
private void StartDeployWatcher()
{
if (!_options.Repository.WatchDeployEvents) return;
if (_ownedRepositoryClient is null && _hierarchySource is null) return;
// Reuse the lazily-built repository client (DiscoverAsync constructs it on demand).
// If discovery hasn't run yet, build the client here so the watcher has a target.
if (_ownedRepositoryClient is null)
{
_ownedRepositoryClient = MxGateway.Client.GalaxyRepositoryClient.Create(
BuildClientOptions(_options.Gateway));
}
var source = new GatewayGalaxyDeployWatchSource(_ownedRepositoryClient);
_deployWatcher = new DeployWatcher(source, _logger);
_deployWatcher.OnRediscoveryNeeded += (_, args) => OnRediscoveryNeeded?.Invoke(this, args);
_ = _deployWatcher.StartAsync(CancellationToken.None);
}
/// <inheritdoc />
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
// In-place config reapply. PR 4.5's reconnect supervisor will swap the
// gateway-client options under the lock; for the skeleton we just refresh health.
ObjectDisposedException.ThrowIf(_disposed, this);
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
return Task.CompletedTask;
}
/// <inheritdoc />
public Task ShutdownAsync(CancellationToken cancellationToken)
{
if (_disposed) return Task.CompletedTask;
_logger.LogInformation("GalaxyDriver {InstanceId} shutting down", _driverInstanceId);
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
return Task.CompletedTask;
}
/// <inheritdoc />
public DriverHealth GetHealth()
{
// Reconnect supervisor wins when degraded — the cached _health reflects the last
// successful operation, but ongoing recovery should surface as Degraded.
if (_supervisor?.IsDegraded == true)
{
return new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, _supervisor.LastError);
}
return _health;
}
// ===== IHostConnectivityProbe (PR 4.7 wire-up) =====
/// <inheritdoc />
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses() => _hostStatuses.Snapshot();
/// <inheritdoc />
public long GetMemoryFootprint() => 0; // PR 4.4 sets this from SubscriptionRegistry size.
/// <inheritdoc />
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ===== ITagDiscovery (PR 4.1) =====
/// <inheritdoc />
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(builder);
// PR 4.3 — capture SecurityClassification per attribute. PR 4.W — also refresh
// the per-platform probe watcher's membership after discovery so newly-added
// $WinPlatform / $AppEngine objects start advising their ScanState attribute.
var capturingBuilder = new SecurityCapturingBuilder(builder, _securityByFullRef);
var source = _hierarchySource ??= BuildDefaultHierarchySource();
var discoverer = new GalaxyDiscoverer(source);
await discoverer.DiscoverAsync(capturingBuilder, cancellationToken).ConfigureAwait(false);
if (_probeWatcher is not null)
{
var hierarchy = await source.GetHierarchyAsync(cancellationToken).ConfigureAwait(false);
var platforms = hierarchy
.Where(o => o.TemplateChain.Any(t =>
string.Equals(t, "$WinPlatform", StringComparison.OrdinalIgnoreCase)
|| string.Equals(t, "$AppEngine", StringComparison.OrdinalIgnoreCase)))
.Select(o => o.TagName)
.Where(name => !string.IsNullOrEmpty(name));
await _probeWatcher.SyncPlatformsAsync(platforms, cancellationToken).ConfigureAwait(false);
}
}
private SecurityClassification ResolveSecurity(string fullReference) =>
_securityByFullRef.TryGetValue(fullReference, out var sec) ? sec : SecurityClassification.FreeAccess;
// ===== IReadable =====
/// <inheritdoc />
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(fullReferences);
if (fullReferences.Count == 0) return Task.FromResult<IReadOnlyList<DataValueSnapshot>>([]);
if (_dataReader is not null)
{
// Test-only path — tests inject a canned reader via the internal ctor.
return _dataReader.ReadAsync(fullReferences, cancellationToken);
}
if (_subscriber is null)
{
throw new NotSupportedException(
"GalaxyDriver.ReadAsync requires a connected GalaxyMxSession (production runtime not built). " +
"Either inject a test seam via the internal ctor or call InitializeAsync against a real gateway.");
}
return ReadViaSubscribeOnceAsync(fullReferences, cancellationToken);
}
/// <summary>
/// Production read path. MxAccess has no one-shot Read RPC — every value comes
/// through the event stream. We synthesise a Read by:
/// <list type="number">
/// <item>Subscribing the requested tags through the existing
/// <see cref="SubscriptionRegistry"/> + <see cref="EventPump"/>.</item>
/// <item>Waiting for the first <c>OnDataChange</c> per item handle (the gateway
/// pushes the current value as the initial event after a SubscribeBulk).</item>
/// <item>Unsubscribing.</item>
/// </list>
/// Tags the gw rejects at SubscribeBulk time, or that never publish before the
/// caller's cancellation token fires, return a Bad-status snapshot in input order
/// so the caller still sees one snapshot per requested reference.
/// </summary>
private async Task<IReadOnlyList<DataValueSnapshot>> ReadViaSubscribeOnceAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var pump = EnsureEventPumpStarted();
var subscriptionId = _subscriptions.NextSubscriptionId();
// Pre-allocate one TaskCompletionSource per full-reference so the OnDataChange
// handler can complete them out-of-order as events arrive. Wired BEFORE the
// SubscribeBulk call so we don't race with the first event the gw pushes.
var pendingByRef = new Dictionary<string, TaskCompletionSource<DataValueSnapshot>>(
StringComparer.OrdinalIgnoreCase);
foreach (var fullRef in fullReferences.Distinct(StringComparer.OrdinalIgnoreCase))
{
pendingByRef[fullRef] = new TaskCompletionSource<DataValueSnapshot>(
TaskCreationOptions.RunContinuationsAsynchronously);
}
EventHandler<DataChangeEventArgs> handler = (_, args) =>
{
// Filter to OUR subscription — the pump's OnDataChange fans out across all
// subscriptions on the driver, and we don't want a parallel ISubscribable
// caller's events to leak into our read.
if (args.SubscriptionHandle is GalaxySubscriptionHandle gsh
&& gsh.SubscriptionId == subscriptionId
&& pendingByRef.TryGetValue(args.FullReference, out var tcs))
{
tcs.TrySetResult(args.Snapshot);
}
};
pump.OnDataChange += handler;
var bufferedIntervalMs = _options.MxAccess.PublishingIntervalMs;
IReadOnlyList<SubscribeResult> results;
try
{
results = await _subscriber!
.SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken)
.ConfigureAwait(false);
}
catch
{
pump.OnDataChange -= handler;
throw;
}
// Register bindings so the pump knows to dispatch events for these handles.
var bindings = new List<TagBinding>(fullReferences.Count);
for (var i = 0; i < fullReferences.Count; i++)
{
var fullRef = fullReferences[i];
var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase));
var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0;
bindings.Add(new TagBinding(fullRef, itemHandle));
// Tags the gw rejected up front — complete with Bad status now so the
// wait below doesn't time out on them.
if (itemHandle <= 0
&& pendingByRef.TryGetValue(fullRef, out var rejectedTcs))
{
rejectedTcs.TrySetResult(new DataValueSnapshot(
Value: null,
StatusCode: 0x80000000u, // Bad
SourceTimestampUtc: null,
ServerTimestampUtc: DateTime.UtcNow));
}
}
_subscriptions.Register(subscriptionId, bindings);
try
{
// Wait for every pending TCS to complete or the caller's CT to fire. When the
// CT fires before all values arrive, fill the still-pending entries with a
// Bad-status snapshot rather than throwing — Read semantics let callers see
// partial results.
using var registration = cancellationToken.Register(() =>
{
foreach (var tcs in pendingByRef.Values)
{
tcs.TrySetResult(new DataValueSnapshot(
Value: null,
StatusCode: 0x800B0000u, // BadTimeout
SourceTimestampUtc: null,
ServerTimestampUtc: DateTime.UtcNow));
}
});
var snapshots = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
snapshots[i] = await pendingByRef[fullReferences[i]].Task.ConfigureAwait(false);
}
return snapshots;
}
finally
{
pump.OnDataChange -= handler;
// Drop the bindings + unsubscribe the live handles. UnsubscribeBulkAsync's
// failure isn't fatal — the registry is already cleared, so any straggling
// event from the gw would be a no-op fan-out.
_subscriptions.Remove(subscriptionId);
var liveHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray();
if (liveHandles.Length > 0)
{
try
{
await _subscriber!.UnsubscribeBulkAsync(liveHandles, CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"GalaxyDriver.ReadViaSubscribeOnceAsync UnsubscribeBulk failed for {Count} handle(s) — registry already cleared.",
liveHandles.Length);
}
}
}
}
// ===== IWritable (PR 4.3) =====
/// <inheritdoc />
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(writes);
if (writes.Count == 0) return Task.FromResult<IReadOnlyList<WriteResult>>([]);
if (_dataWriter is null)
{
// Mirror the IReadable fallback: production write path runs on top of
// GalaxyMxSession (PR 4.2 skeleton; PR 4.4 wires the live session). Until
// that lands, deployments selecting Galaxy:Backend=mxgateway can't write.
throw new NotSupportedException(
"GalaxyDriver.WriteAsync requires GatewayGalaxyDataWriter wired against a connected " +
"GalaxyMxSession (PR 4.4). Until that lands, route writes through the legacy-host " +
"backend (Galaxy:Backend=legacy-host).");
}
return _dataWriter.WriteAsync(writes, ResolveSecurity, cancellationToken);
}
// ===== ISubscribable (PR 4.4) =====
/// <inheritdoc />
public async Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(fullReferences);
if (_subscriber is null)
{
throw new NotSupportedException(
"GalaxyDriver.SubscribeAsync requires a connected GalaxyMxSession + GatewayGalaxySubscriber. " +
"PR 4.W wires the production session; until then route subscriptions through the legacy-host backend.");
}
var pump = EnsureEventPumpStarted();
var subscriptionId = _subscriptions.NextSubscriptionId();
if (fullReferences.Count == 0)
{
// Empty subscriptions register but never bind anything — keeps Unsubscribe
// symmetric for callers that conditionally add tags later.
_subscriptions.Register(subscriptionId, []);
return new GalaxySubscriptionHandle(subscriptionId);
}
// PR 6.3 — when the caller doesn't set a publishing interval (TimeSpan.Zero or
// negative), fall back to the configured MxAccess.PublishingIntervalMs. The
// server's UA subscription publishingInterval drives this in production; tests
// and infrastructure callers (probe watcher, deploy watcher) hit the fallback.
var requested = (int)Math.Max(0, publishingInterval.TotalMilliseconds);
var bufferedIntervalMs = requested > 0 ? requested : _options.MxAccess.PublishingIntervalMs;
var results = await _subscriber
.SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken)
.ConfigureAwait(false);
// Build the binding list in input order. Failed entries (gw rejected the tag) are
// recorded with a non-positive ItemHandle so the caller can detect partial failure
// by inspecting the returned handle's diagnostic context — full per-tag error
// surface lands in PR 5.3's parity tests.
var bindings = new List<TagBinding>(fullReferences.Count);
for (var i = 0; i < fullReferences.Count; i++)
{
var fullRef = fullReferences[i];
var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase));
var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0;
bindings.Add(new TagBinding(fullRef, itemHandle));
if (match is null || !match.WasSuccessful)
{
_logger.LogWarning(
"Galaxy subscribe for {FullRef} failed: {Error}",
fullRef, match?.ErrorMessage ?? "<no result returned>");
}
}
_subscriptions.Register(subscriptionId, bindings);
_ = pump; // keep the pump alive for the subscription's lifetime
return new GalaxySubscriptionHandle(subscriptionId);
}
/// <inheritdoc />
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(handle);
if (handle is not GalaxySubscriptionHandle gsh)
{
throw new ArgumentException(
$"Subscription handle was not issued by this driver (expected GalaxySubscriptionHandle, got {handle.GetType().Name}).",
nameof(handle));
}
var bindings = _subscriptions.Remove(gsh.SubscriptionId);
if (bindings is null) return; // already removed or never registered
var liveItemHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray();
if (liveItemHandles.Length == 0 || _subscriber is null) return;
try
{
await _subscriber.UnsubscribeBulkAsync(liveItemHandles, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy UnsubscribeBulk failed for subscription {SubscriptionId} — registry already cleared on driver side.",
gsh.SubscriptionId);
}
}
/// <summary>
/// Lazily start the <see cref="EventPump"/> on the first subscribe. The pump is
/// shared across every subscription on this driver — fan-out happens through the
/// <see cref="SubscriptionRegistry"/> reverse map, not by spinning a pump per
/// subscription.
/// </summary>
private EventPump EnsureEventPumpStarted()
{
lock (_pumpLock)
{
if (_eventPump is not null) return _eventPump;
_eventPump = new EventPump(
_subscriber!, _subscriptions, _logger,
channelCapacity: _options.MxAccess.EventPumpChannelCapacity,
clientName: _options.MxAccess.ClientName,
onStreamFault: OnEventPumpStreamFault);
_eventPump.OnDataChange += OnPumpDataChange;
_eventPump.Start();
return _eventPump;
}
}
/// <summary>
/// Stream-fault callback for the <see cref="EventPump"/>. The gw StreamEvents
/// stream faulted (transient gateway drop, network blip, gw restart). Forward
/// the cause to the <see cref="ReconnectSupervisor"/> so it drives reopen →
/// replay; without this hand-off a transient transport drop permanently kills
/// the event stream and <c>GetHealth()</c> keeps reporting Healthy.
/// </summary>
private void OnEventPumpStreamFault(Exception cause)
{
var supervisor = _supervisor;
if (supervisor is null)
{
// No production runtime (skeleton / injected-seam path) — nothing to drive.
_logger.LogWarning(cause,
"GalaxyDriver {InstanceId} event stream faulted but no reconnect supervisor is wired.",
_driverInstanceId);
return;
}
try
{
supervisor.ReportTransportFailure(cause);
}
catch (ObjectDisposedException)
{
// Driver is being disposed — the stream fault is just shutdown noise.
}
}
// ===== IAlarmSource =====
/// <summary>
/// Start the gateway alarm feed (idempotent) and wire its transitions onto this
/// driver's <see cref="OnAlarmEvent"/> bridge. The feed is session-less — it does
/// not depend on a data subscription or the <see cref="EventPump"/>.
/// </summary>
private void EnsureAlarmFeedStarted()
{
lock (_alarmFeedLock)
{
if (_alarmFeed is null)
{
throw new InvalidOperationException(
"GalaxyDriver alarm feed is not wired. InitializeAsync must run (or a feed " +
"seam must be injected via the internal ctor) before subscribing to alarms.");
}
if (_alarmFeedWired) return;
_alarmFeed.OnAlarmTransition += OnAlarmFeedTransition;
_alarmFeed.Start();
_alarmFeedWired = true;
}
}
/// <inheritdoc />
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(sourceNodeIds);
// The driver doesn't multiplex alarm subscriptions per source-node-id today —
// every active handle receives every transition off the gateway's session-less
// StreamAlarms feed, and the server filters by source node before raising Part 9
// conditions. The subscription handle is a sentinel the server uses for
// symmetric Unsubscribe. Same shape AbCip uses.
EnsureAlarmFeedStarted();
var handle = new GalaxyAlarmSubscriptionHandle(Guid.NewGuid().ToString("N"));
lock (_alarmHandlersLock)
{
_alarmSubscriptions.Add(handle);
}
return Task.FromResult<IAlarmSubscriptionHandle>(handle);
}
/// <inheritdoc />
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(handle);
if (handle is not GalaxyAlarmSubscriptionHandle gash)
{
throw new ArgumentException(
$"Subscription handle was not issued by this driver (expected GalaxyAlarmSubscriptionHandle, got {handle.GetType().Name}).",
nameof(handle));
}
lock (_alarmHandlersLock)
{
_alarmSubscriptions.Remove(gash);
}
return Task.CompletedTask;
}
/// <inheritdoc />
public async Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(acknowledgements);
if (acknowledgements.Count == 0) return;
if (_alarmAcknowledger is null)
{
throw new NotSupportedException(
"GalaxyDriver.AcknowledgeAsync requires GatewayGalaxyAlarmAcknowledger wired against a connected " +
"GalaxyMxSession (PR B.2). InitializeAsync must run before alarm acknowledgements can flow.");
}
// Acks are issued one-by-one — the gateway RPC accepts a single alarm
// reference per call. AlarmConditionState's per-condition Acknowledge in the
// server-side ACL layer is the natural rate-limit, so issuing in series here
// keeps the operator-comment ordering deterministic without bursting the
// worker's STA queue.
foreach (var ack in acknowledgements)
{
// ConditionId carries the alarm full reference for the Galaxy driver —
// SourceNodeId is the OPC UA browse path, which the gateway can't address.
// The server-side condition state pairs them through AlarmConditionService.
var alarmFullReference = !string.IsNullOrEmpty(ack.ConditionId)
? ack.ConditionId
: ack.SourceNodeId;
await _alarmAcknowledger.AcknowledgeAsync(
alarmFullReference,
ack.Comment ?? string.Empty,
operatorUser: string.Empty, // server-side ACL fills this from the OPC UA session
cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Receives <see cref="GalaxyAlarmTransition"/> events from the gateway alarm
/// feed and reshapes them into <see cref="AlarmEventArgs"/> for OPC UA-side
/// consumers. Fires <see cref="OnAlarmEvent"/> only when at least one alarm
/// subscription is active so a server that hasn't called
/// <see cref="SubscribeAlarmsAsync"/> yet doesn't surface untracked transitions.
/// </summary>
private void OnAlarmFeedTransition(object? sender, GalaxyAlarmTransition transition)
{
GalaxyAlarmSubscriptionHandle? handle;
lock (_alarmHandlersLock)
{
// Pick any active subscription handle as the "owner" of the event. The
// server-side state machine doesn't multiplex by handle today; if multiple
// alarm subscriptions are active we still only fire the event once and
// the AlarmConditionService dispatches per-source-node downstream.
handle = _alarmSubscriptions.Count > 0
? _alarmSubscriptions.First()
: null;
}
if (handle is null) return;
var args = new AlarmEventArgs(
SubscriptionHandle: handle,
SourceNodeId: transition.SourceObjectReference,
ConditionId: transition.AlarmFullReference,
AlarmType: transition.AlarmTypeName,
Message: transition.Description,
Severity: transition.SeverityBucket,
SourceTimestampUtc: transition.TransitionTimestampUtc,
OperatorComment: string.IsNullOrEmpty(transition.OperatorComment) ? null : transition.OperatorComment,
OriginalRaiseTimestampUtc: transition.OriginalRaiseTimestampUtc,
AlarmCategory: string.IsNullOrEmpty(transition.Category) ? null : transition.Category);
try
{
OnAlarmEvent?.Invoke(this, args);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"GalaxyDriver OnAlarmEvent handler threw for {AlarmRef} — continuing.",
transition.AlarmFullReference);
}
}
/// <summary>
/// Forwards every fan-out event to the public <see cref="OnDataChange"/> for
/// ISubscribable consumers, AND routes ScanState changes to the per-platform
/// probe watcher (PR 4.7) so platform health entries update without the watcher
/// consuming the event stream itself.
/// </summary>
private void OnPumpDataChange(object? sender, DataChangeEventArgs args)
{
OnDataChange?.Invoke(this, args);
if (_probeWatcher is not null
&& args.FullReference.EndsWith(PerPlatformProbeWatcher.ProbeSuffix, StringComparison.OrdinalIgnoreCase))
{
// The probe decoder takes a raw quality byte; recover it from the StatusCode
// top byte (Good=0x00 → byte 192, Uncertain=0x40 → byte 64, Bad=0x80 → byte 0).
var qualityByte = (byte)((args.Snapshot.StatusCode >> 30) & 0x3) switch
{
0 => 192,
1 => 64,
_ => 0,
};
_probeWatcher.OnProbeValueChanged(args.FullReference, args.Snapshot.Value, (byte)qualityByte);
}
}
/// <summary>
/// Lazily builds the default <see cref="IGalaxyHierarchySource"/> from
/// <c>_options.Gateway</c>. Owned <see cref="GalaxyRepositoryClient"/> is disposed in
/// <see cref="Dispose"/>. Tests bypass this by injecting their own source via the
/// internal ctor.
/// </summary>
private IGalaxyHierarchySource BuildDefaultHierarchySource()
{
var gw = _options.Gateway;
var clientOptions = new MxGatewayClientOptions
{
Endpoint = new Uri(gw.Endpoint, UriKind.Absolute),
ApiKey = ResolveApiKey(gw.ApiKeySecretRef),
UseTls = gw.UseTls,
CaCertificatePath = gw.CaCertificatePath,
ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds),
DefaultCallTimeout = TimeSpan.FromSeconds(gw.DefaultCallTimeoutSeconds),
StreamTimeout = gw.StreamTimeoutSeconds > 0
? TimeSpan.FromSeconds(gw.StreamTimeoutSeconds)
: null,
};
_ownedRepositoryClient = GalaxyRepositoryClient.Create(clientOptions);
return new TracedGalaxyHierarchySource(
new GatewayGalaxyHierarchySource(_ownedRepositoryClient), _options.MxAccess.ClientName);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// Order: stop deploy watcher, supervisor, probe watcher, pump, then sessions and
// clients. Each step is best-effort — disposal during a faulted state shouldn't
// throw and prevent the rest of the cleanup.
try { _deployWatcher?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "DeployWatcher dispose failed"); }
try { _supervisor?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "ReconnectSupervisor dispose failed"); }
try { _probeWatcher?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "ProbeWatcher dispose failed"); }
try { _transportForwarder?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "Transport forwarder dispose failed"); }
EventPump? pump;
lock (_pumpLock) { pump = _eventPump; _eventPump = null; }
pump?.DisposeAsync().AsTask().GetAwaiter().GetResult();
IGalaxyAlarmFeed? alarmFeed;
lock (_alarmFeedLock) { alarmFeed = _alarmFeed; _alarmFeed = null; }
try { alarmFeed?.DisposeAsync().AsTask().GetAwaiter().GetResult(); }
catch (Exception ex) { _logger.LogWarning(ex, "Alarm feed dispose failed"); }
_ownedMxSession?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedMxSession = null;
_ownedMxClient?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedMxClient = null;
_ownedRepositoryClient?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedRepositoryClient = null;
_hierarchySource = null;
}
/// <summary>
/// Address-space builder wrapper that records each variable's
/// <see cref="DriverAttributeInfo.SecurityClass"/> into the supplied dictionary
/// before delegating to the inner builder. Used by <see cref="DiscoverAsync"/>
/// to capture per-tag classifications for the IWritable routing decision —
/// PR 4.3 needs the data, but the discoverer itself doesn't (and shouldn't)
/// know about the driver's internal state.
/// </summary>
private sealed class SecurityCapturingBuilder(
IAddressSpaceBuilder inner,
System.Collections.Concurrent.ConcurrentDictionary<string, SecurityClassification> map)
: IAddressSpaceBuilder
{
public IAddressSpaceBuilder Folder(string browseName, string displayName)
=> new SecurityCapturingBuilder(inner.Folder(browseName, displayName), map);
public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo attributeInfo)
{
map[attributeInfo.FullName] = attributeInfo.SecurityClass;
return inner.Variable(browseName, displayName, attributeInfo);
}
public void AddProperty(string browseName, DriverDataType dataType, object? value)
=> inner.AddProperty(browseName, dataType, value);
}
}