Merge branch 'worktree-agent-adfb71e38279b8f48' into feat/scripted-alarm-shelve-routing

This commit is contained in:
Joseph Doherty
2026-05-22 10:22:56 -04:00
5 changed files with 296 additions and 75 deletions

View File

@@ -26,7 +26,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy;
/// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing.
/// </remarks>
public sealed class GalaxyDriver
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable
{
private readonly string _driverInstanceId;
private readonly GalaxyDriverOptions _options;
@@ -75,7 +75,10 @@ public sealed class GalaxyDriver
private readonly Lock _alarmHandlersLock = new();
private readonly Lock _alarmFeedLock = new();
private bool _alarmFeedWired;
private readonly HashSet<GalaxyAlarmSubscriptionHandle> _alarmSubscriptions = new();
// List preserves insertion order so OnAlarmFeedTransition always picks the
// earliest-registered handle — a deterministic choice that doesn't vary as
// handles are added/removed (Driver.Galaxy-006 fix: HashSet.First() is unstable).
private readonly List<GalaxyAlarmSubscriptionHandle> _alarmSubscriptions = new();
// PR 4.W — production runtime owned by InitializeAsync. The driver builds these
// when it opens a real gw session; tests bypass them by injecting seams via the
@@ -442,17 +445,21 @@ public sealed class GalaxyDriver
// Reuse the lazily-built repository client (DiscoverAsync constructs it on demand).
// If discovery hasn't run yet, build the client here so the watcher has a target.
if (_ownedRepositoryClient is null)
{
_ownedRepositoryClient = MxGateway.Client.GalaxyRepositoryClient.Create(
BuildClientOptions(_options.Gateway));
}
// Driver.Galaxy-009 fix: guard with ??= so if BuildDefaultHierarchySource later runs
// it reuses this client rather than overwriting the field and leaking the first instance.
_ownedRepositoryClient ??= MxGateway.Client.GalaxyRepositoryClient.Create(
BuildClientOptions(_options.Gateway));
var source = new GatewayGalaxyDeployWatchSource(_ownedRepositoryClient);
_deployWatcher = new DeployWatcher(source, _logger);
_deployWatcher.OnRediscoveryNeeded += (_, args) => OnRediscoveryNeeded?.Invoke(this, args);
_ = _deployWatcher.StartAsync(CancellationToken.None);
// StartAsync schedules the background loop and returns Task.CompletedTask immediately.
// It throws InvalidOperationException synchronously if called twice (programming error).
// Driver.Galaxy-009 fix: don't discard the return value — observe any synchronous throw.
var startTask = _deployWatcher.StartAsync(CancellationToken.None);
// The task is already completed (StartAsync is synchronous); surface any synchronous fault.
if (startTask.IsFaulted) startTask.GetAwaiter().GetResult();
}
/// <inheritdoc />
@@ -492,7 +499,22 @@ public sealed class GalaxyDriver
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses() => _hostStatuses.Snapshot();
/// <inheritdoc />
public long GetMemoryFootprint() => 0; // PR 4.4 sets this from SubscriptionRegistry size.
/// <remarks>
/// Estimated footprint: 64 bytes × tracked item handles (one gw subscription entry
/// per bound tag) + 256 bytes × tracked driver subscriptions (registry overhead per
/// OPC UA monitored item). Returns 0 when no subscriptions are active. These
/// constants are conservative — a 50k-tag set occupies ~3 MB and registers clearly
/// with the server's cache-flush heuristic. Driver.Galaxy-011: the stale
/// "PR 4.4 sets this" comment is removed; PR 4.4 shipped the SubscriptionRegistry
/// but never wired it here.
/// </remarks>
public long GetMemoryFootprint()
{
const long BytesPerItemHandle = 64L; // TagBinding + reverse-map entry
const long BytesPerSubscription = 256L; // SubscriptionEntry overhead
return (_subscriptions.TrackedItemHandleCount * BytesPerItemHandle)
+ (_subscriptions.TrackedSubscriptionCount * BytesPerSubscription);
}
/// <inheritdoc />
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
@@ -964,12 +986,15 @@ public sealed class GalaxyDriver
GalaxyAlarmSubscriptionHandle? handle;
lock (_alarmHandlersLock)
{
// Pick any active subscription handle as the "owner" of the event. The
// server-side state machine doesn't multiplex by handle today; if multiple
// alarm subscriptions are active we still only fire the event once and
// the AlarmConditionService dispatches per-source-node downstream.
// Pick the earliest-registered handle as the event owner. The server routes
// by SourceNodeId (not by handle), so every active subscriber sees the same
// transition regardless of which handle is attached here. Using the first
// insertion-order entry is deterministic and stable as long as at least one
// subscription remains — HashSet.First() was unstable across mutations
// (Driver.Galaxy-006 fix). _alarmSubscriptions is a List, so [0] is always
// the earliest-registered handle.
handle = _alarmSubscriptions.Count > 0
? _alarmSubscriptions.First()
? _alarmSubscriptions[0]
: null;
}
if (handle is null) return;
@@ -1010,15 +1035,11 @@ public sealed class GalaxyDriver
if (_probeWatcher is not null
&& args.FullReference.EndsWith(PerPlatformProbeWatcher.ProbeSuffix, StringComparison.OrdinalIgnoreCase))
{
// The probe decoder takes a raw quality byte; recover it from the StatusCode
// top byte (Good=0x00 → byte 192, Uncertain=0x40 → byte 64, Bad=0x80 → byte 0).
var qualityByte = (byte)((args.Snapshot.StatusCode >> 30) & 0x3) switch
{
0 => 192,
1 => 64,
_ => 0,
};
_probeWatcher.OnProbeValueChanged(args.FullReference, args.Snapshot.Value, (byte)qualityByte);
// The probe decoder takes a raw quality byte. Recover it via the canonical
// StatusCodeMap.ToQualityCategoryByte helper so the mapping lives in one
// place next to its inverse (FromQualityByte) and cannot desync silently.
var qualityByte = StatusCodeMap.ToQualityCategoryByte(args.Snapshot.StatusCode);
_probeWatcher.OnProbeValueChanged(args.FullReference, args.Snapshot.Value, qualityByte);
}
}
@@ -1030,57 +1051,81 @@ public sealed class GalaxyDriver
/// </summary>
private IGalaxyHierarchySource BuildDefaultHierarchySource()
{
var gw = _options.Gateway;
var clientOptions = new MxGatewayClientOptions
{
Endpoint = new Uri(gw.Endpoint, UriKind.Absolute),
ApiKey = ResolveApiKey(gw.ApiKeySecretRef),
UseTls = gw.UseTls,
CaCertificatePath = gw.CaCertificatePath,
ConnectTimeout = TimeSpan.FromSeconds(gw.ConnectTimeoutSeconds),
DefaultCallTimeout = TimeSpan.FromSeconds(gw.DefaultCallTimeoutSeconds),
StreamTimeout = gw.StreamTimeoutSeconds > 0
? TimeSpan.FromSeconds(gw.StreamTimeoutSeconds)
: null,
};
_ownedRepositoryClient = GalaxyRepositoryClient.Create(clientOptions);
// Driver.Galaxy-009 fix: reuse a client that StartDeployWatcher may have already
// created (??=) rather than always overwriting the field and leaking the first
// instance. Both paths produce equivalent clients from the same options.
_ownedRepositoryClient ??= GalaxyRepositoryClient.Create(BuildClientOptions(_options.Gateway));
return new TracedGalaxyHierarchySource(
new GatewayGalaxyHierarchySource(_ownedRepositoryClient), _options.MxAccess.ClientName);
}
public void Dispose()
/// <summary>
/// Asynchronous disposal. Prefer <c>await using</c> over <c>using</c> — the
/// async path does not block the caller while awaiting EventPump / session /
/// client shutdown (Driver.Galaxy-007: the sync path blocked on
/// <c>GetAwaiter().GetResult()</c> for every async sub-component, risking a
/// deadlock under thread-pool starvation).
/// </summary>
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
// Order: stop deploy watcher, supervisor, probe watcher, pump, then sessions and
// clients. Each step is best-effort — disposal during a faulted state shouldn't
// throw and prevent the rest of the cleanup.
// Synchronous sub-components first — none of these block.
try { _deployWatcher?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "DeployWatcher dispose failed"); }
try { _supervisor?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "ReconnectSupervisor dispose failed"); }
try { _probeWatcher?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "ProbeWatcher dispose failed"); }
try { _transportForwarder?.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "Transport forwarder dispose failed"); }
// Async sub-components: await each so we don't block a thread-pool thread
// on a slow shutdown (e.g. EventPump draining its channel, gRPC stream closing).
EventPump? pump;
lock (_pumpLock) { pump = _eventPump; _eventPump = null; }
pump?.DisposeAsync().AsTask().GetAwaiter().GetResult();
if (pump is not null)
{
try { await pump.DisposeAsync().ConfigureAwait(false); }
catch (Exception ex) { _logger.LogWarning(ex, "EventPump dispose failed"); }
}
IGalaxyAlarmFeed? alarmFeed;
lock (_alarmFeedLock) { alarmFeed = _alarmFeed; _alarmFeed = null; }
try { alarmFeed?.DisposeAsync().AsTask().GetAwaiter().GetResult(); }
catch (Exception ex) { _logger.LogWarning(ex, "Alarm feed dispose failed"); }
if (alarmFeed is not null)
{
try { await alarmFeed.DisposeAsync().ConfigureAwait(false); }
catch (Exception ex) { _logger.LogWarning(ex, "Alarm feed dispose failed"); }
}
_ownedMxSession?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedMxSession = null;
if (_ownedMxSession is not null)
{
try { await _ownedMxSession.DisposeAsync().ConfigureAwait(false); }
catch (Exception ex) { _logger.LogWarning(ex, "MxSession dispose failed"); }
_ownedMxSession = null;
}
_ownedMxClient?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedMxClient = null;
if (_ownedMxClient is not null)
{
try { await _ownedMxClient.DisposeAsync().ConfigureAwait(false); }
catch (Exception ex) { _logger.LogWarning(ex, "MxClient dispose failed"); }
_ownedMxClient = null;
}
if (_ownedRepositoryClient is not null)
{
try { await _ownedRepositoryClient.DisposeAsync().ConfigureAwait(false); }
catch (Exception ex) { _logger.LogWarning(ex, "RepositoryClient dispose failed"); }
_ownedRepositoryClient = null;
}
_ownedRepositoryClient?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedRepositoryClient = null;
_hierarchySource = null;
}
/// <summary>
/// Synchronous disposal. Prefer <see cref="DisposeAsync"/> in async contexts —
/// this path must block on every async sub-component shutdown. Provided for
/// compatibility with <c>using</c> statements that cannot <c>await</c>.
/// </summary>
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
/// <summary>
/// Address-space builder wrapper that records each variable's
/// <see cref="DriverAttributeInfo.SecurityClass"/> into the supplied dictionary

View File

@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
@@ -75,18 +76,20 @@ internal static class StatusCodeMap
};
/// <summary>
/// Map a gateway-reported <see cref="MxStatusProxy"/> to OPC UA StatusCode. Honors
/// the success flag, then the detail byte (treated as a quality substatus), with a
/// transport-error fallback for status rows whose detected_by indicates the failure
/// happened before the MXAccess call ran.
/// Map a gateway-reported <see cref="MxStatusProxy"/> to OPC UA StatusCode. Uses
/// <see cref="MxStatusProxyExtensions.IsSuccess"/> (category == OK AND success != 0)
/// as the canonical success test — the proto contract explicitly documents that
/// <c>success</c> is NOT a boolean and must not be checked in isolation; category is
/// the authoritative indicator. On failure, the detail byte (OPC DA quality substatus)
/// drives the specific code, with a transport-error fallback for pre-MXAccess failures.
/// </summary>
public static uint FromMxStatus(MxStatusProxy? status, ILogger? logger = null)
{
if (status is null) return Good;
if (status.Success != 0) return Good;
if (status.IsSuccess()) return Good;
// Detail field carries the substatus when the worker translated MX-style codes;
// when zero, infer from category + detected_by.
// when zero, infer from detected_by.
var detail = (byte)(status.Detail & 0xFF);
if (detail != 0) return FromQualityByte(detail, logger);
@@ -98,6 +101,25 @@ internal static class StatusCodeMap
return Bad;
}
/// <summary>
/// Convert an OPC UA status-code uint back to the OPC DA quality category
/// byte — Good=192, Uncertain=64, Bad=0 — by extracting the top-two bits of the
/// high word. This is the inverse of the category-bucket arm of
/// <see cref="FromQualityByte"/>. It is intentionally lossy (substatus bits are not
/// round-tripped) because the sole consumer
/// (<see cref="ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health.PerPlatformProbeWatcher"/>)
/// only tests <c>qualityByte &lt; 192</c> to distinguish Running from Stopped. Keeping
/// the round-trip in one place means a future change to the OPC UA bit layout cannot
/// silently desync the probe-health decode.
/// </summary>
public static byte ToQualityCategoryByte(uint statusCode) =>
(byte)(((statusCode >> 30) & 0x3u) switch
{
0u => 192u, // Good — top two bits 00b → OPC DA 0xC0
1u => 64u, // Uncertain — top two bits 01b → OPC DA 0x40
_ => 0u, // Bad — top two bits 10b/11b → OPC DA 0x00
});
private static uint Categorize(byte q, ILogger? logger)
{
if (q >= 192) { Log(logger, q, "Good"); return Good; }