Track Galaxy Platform and AppEngine runtime state via ScanState probes and proactively invalidate descendant variable quality on Stopped transitions so operators can detect a stopped runtime host before downstream clients read stale data and so the bridge delivers a uniform bad-quality signal instead of relying on MxAccess per-tag fan-out
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,7 @@ using Serilog;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.Domain;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.Metrics;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.MxAccess;
|
||||
|
||||
namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
{
|
||||
@@ -32,6 +33,19 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
private readonly AlarmObjectFilter? _alarmObjectFilter;
|
||||
private int _alarmFilterIncludedObjectCount;
|
||||
private readonly bool _anonymousCanWrite;
|
||||
|
||||
// Host → list of OPC UA variable nodes transitively hosted by that host. Populated during
|
||||
// BuildAddressSpace by walking each variable's owning object's hosted_by_gobject_id chain
|
||||
// up to the nearest $WinPlatform or $AppEngine. A variable that lives under a nested host
|
||||
// (e.g. a user object under an Engine under a Platform) appears in BOTH the Engine's and
|
||||
// the Platform's list. Used by MarkHostVariablesBadQuality / ClearHostVariablesBadQuality
|
||||
// when the galaxy runtime probe reports a host transition.
|
||||
private readonly Dictionary<int, List<BaseDataVariableState>> _hostedVariables =
|
||||
new Dictionary<int, List<BaseDataVariableState>>();
|
||||
|
||||
// Runtime status probe manager — null when MxAccessConfiguration.RuntimeStatusProbesEnabled
|
||||
// is false. Built at construction time and synced to the hierarchy on every BuildAddressSpace.
|
||||
private readonly GalaxyRuntimeProbeManager? _galaxyRuntimeProbeManager;
|
||||
private readonly AutoResetEvent _dataChangeSignal = new(false);
|
||||
private readonly Dictionary<int, List<string>> _gobjectToTagRefs = new();
|
||||
private readonly HistoryContinuationPointManager _historyContinuations = new();
|
||||
@@ -106,7 +120,9 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
NodeId? writeTuneRoleId = null,
|
||||
NodeId? writeConfigureRoleId = null,
|
||||
NodeId? alarmAckRoleId = null,
|
||||
AlarmObjectFilter? alarmObjectFilter = null)
|
||||
AlarmObjectFilter? alarmObjectFilter = null,
|
||||
bool runtimeStatusProbesEnabled = false,
|
||||
int runtimeStatusUnknownTimeoutSeconds = 15)
|
||||
: base(server, configuration, namespaceUri)
|
||||
{
|
||||
_namespaceUri = namespaceUri;
|
||||
@@ -121,6 +137,15 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
_writeConfigureRoleId = writeConfigureRoleId;
|
||||
_alarmAckRoleId = alarmAckRoleId;
|
||||
|
||||
if (runtimeStatusProbesEnabled)
|
||||
{
|
||||
_galaxyRuntimeProbeManager = new GalaxyRuntimeProbeManager(
|
||||
_mxAccessClient,
|
||||
runtimeStatusUnknownTimeoutSeconds,
|
||||
MarkHostVariablesBadQuality,
|
||||
ClearHostVariablesBadQuality);
|
||||
}
|
||||
|
||||
// Wire up data change delivery
|
||||
_mxAccessClient.OnTagValueChanged += OnMxAccessDataChange;
|
||||
|
||||
@@ -190,6 +215,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
public IReadOnlyList<string> AlarmFilterPatterns =>
|
||||
_alarmObjectFilter?.RawPatterns ?? Array.Empty<string>();
|
||||
|
||||
/// <summary>
|
||||
/// Gets a snapshot of the runtime host states (Platforms + AppEngines). Returns an empty
|
||||
/// list when runtime status probing is disabled. The snapshot respects MxAccess transport
|
||||
/// state — when the client is disconnected, every entry is returned as
|
||||
/// <see cref="GalaxyRuntimeState.Unknown"/>.
|
||||
/// </summary>
|
||||
public IReadOnlyList<GalaxyRuntimeStatus> RuntimeStatuses =>
|
||||
_galaxyRuntimeProbeManager?.GetSnapshot() ?? (IReadOnlyList<GalaxyRuntimeStatus>)Array.Empty<GalaxyRuntimeStatus>();
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of bridge-owned runtime status probe subscriptions. Surfaced on the
|
||||
/// dashboard Subscriptions panel to distinguish probe overhead from client subscriptions.
|
||||
/// </summary>
|
||||
public int ActiveRuntimeProbeCount => _galaxyRuntimeProbeManager?.ActiveProbeCount ?? 0;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the runtime historian health snapshot, or <see langword="null"/> when the historian
|
||||
/// plugin is not loaded. Surfaced on the status dashboard so operators can detect query
|
||||
@@ -261,6 +301,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
_alarmDescTags.Clear();
|
||||
_nodeMap.Clear();
|
||||
_gobjectToTagRefs.Clear();
|
||||
_hostedVariables.Clear();
|
||||
VariableNodeCount = 0;
|
||||
ObjectNodeCount = 0;
|
||||
|
||||
@@ -464,12 +505,20 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
if (_alarmTrackingEnabled)
|
||||
SubscribeAlarmTags();
|
||||
|
||||
BuildHostedVariablesMap(hierarchy);
|
||||
|
||||
// Sync the galaxy runtime probe set against the rebuilt hierarchy. This runs
|
||||
// synchronously on the calling thread and issues AdviseSupervisory per host —
|
||||
// expected 500ms-1s additional startup latency for a large multi-host galaxy.
|
||||
_galaxyRuntimeProbeManager?.SyncAsync(hierarchy).GetAwaiter().GetResult();
|
||||
|
||||
_lastHierarchy = new List<GalaxyObjectInfo>(hierarchy);
|
||||
_lastAttributes = new List<GalaxyAttributeInfo>(attributes);
|
||||
|
||||
Log.Information(
|
||||
"Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references, {Alarms} alarm tags",
|
||||
ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count, _alarmInAlarmTags.Count);
|
||||
"Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references, {Alarms} alarm tags, {Hosts} runtime hosts",
|
||||
ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count, _alarmInAlarmTags.Count,
|
||||
_hostedVariables.Count);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,6 +548,120 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
return includedIds;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds the <c>_hostedVariables</c> dictionary from the completed address space. For each
|
||||
/// Galaxy object, walks its <c>HostedByGobjectId</c> chain up to the nearest <c>$WinPlatform</c>
|
||||
/// or <c>$AppEngine</c> and appends every variable the object owns to that host's list. An
|
||||
/// object under an Engine under a Platform appears in BOTH lists so stopping the Platform
|
||||
/// invalidates every descendant Engine's variables as well.
|
||||
/// </summary>
|
||||
private void BuildHostedVariablesMap(List<GalaxyObjectInfo> hierarchy)
|
||||
{
|
||||
_hostedVariables.Clear();
|
||||
if (hierarchy == null || hierarchy.Count == 0)
|
||||
return;
|
||||
|
||||
var byId = new Dictionary<int, GalaxyObjectInfo>(hierarchy.Count);
|
||||
foreach (var obj in hierarchy)
|
||||
byId[obj.GobjectId] = obj;
|
||||
|
||||
foreach (var obj in hierarchy)
|
||||
{
|
||||
if (!_gobjectToTagRefs.TryGetValue(obj.GobjectId, out var tagRefs) || tagRefs.Count == 0)
|
||||
continue;
|
||||
|
||||
// Collect every variable node owned by this object from the tag→variable map.
|
||||
var ownedVariables = new List<BaseDataVariableState>(tagRefs.Count);
|
||||
foreach (var tagRef in tagRefs)
|
||||
if (_tagToVariableNode.TryGetValue(tagRef, out var v))
|
||||
ownedVariables.Add(v);
|
||||
|
||||
if (ownedVariables.Count == 0)
|
||||
continue;
|
||||
|
||||
// Walk HostedByGobjectId up the chain, appending to every Platform/Engine encountered.
|
||||
// Visited set defends against cycles in misconfigured galaxies.
|
||||
var visited = new HashSet<int>();
|
||||
var cursor = obj;
|
||||
var depth = 0;
|
||||
while (cursor != null && depth < 32 && visited.Add(cursor.GobjectId))
|
||||
{
|
||||
if (cursor.CategoryId == 1 || cursor.CategoryId == 3)
|
||||
{
|
||||
if (!_hostedVariables.TryGetValue(cursor.GobjectId, out var list))
|
||||
{
|
||||
list = new List<BaseDataVariableState>();
|
||||
_hostedVariables[cursor.GobjectId] = list;
|
||||
}
|
||||
list.AddRange(ownedVariables);
|
||||
}
|
||||
|
||||
if (cursor.HostedByGobjectId == 0 ||
|
||||
!byId.TryGetValue(cursor.HostedByGobjectId, out var next))
|
||||
break;
|
||||
cursor = next;
|
||||
depth++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Flips every OPC UA variable hosted by the given Galaxy runtime object (Platform or
|
||||
/// AppEngine) to <see cref="StatusCodes.BadOutOfService"/>. Invoked by the runtime probe
|
||||
/// manager's Running → Stopped callback. Safe to call with an unknown gobject id — no-op.
|
||||
/// </summary>
|
||||
/// <param name="gobjectId">The runtime host's gobject_id.</param>
|
||||
public void MarkHostVariablesBadQuality(int gobjectId)
|
||||
{
|
||||
List<BaseDataVariableState>? variables;
|
||||
lock (Lock)
|
||||
{
|
||||
if (!_hostedVariables.TryGetValue(gobjectId, out variables))
|
||||
return;
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
foreach (var variable in variables)
|
||||
{
|
||||
variable.StatusCode = StatusCodes.BadOutOfService;
|
||||
variable.Timestamp = now;
|
||||
variable.ClearChangeMasks(SystemContext, false);
|
||||
}
|
||||
}
|
||||
|
||||
Log.Information(
|
||||
"Marked {Count} variable(s) BadOutOfService for stopped host gobject_id={GobjectId}",
|
||||
variables.Count, gobjectId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resets every OPC UA variable hosted by the given Galaxy runtime object to
|
||||
/// <see cref="StatusCodes.Good"/>. Invoked by the runtime probe manager's Stopped → Running
|
||||
/// callback. Values are left as-is; subsequent MxAccess on-change updates will refresh them
|
||||
/// as tags change naturally.
|
||||
/// </summary>
|
||||
/// <param name="gobjectId">The runtime host's gobject_id.</param>
|
||||
public void ClearHostVariablesBadQuality(int gobjectId)
|
||||
{
|
||||
List<BaseDataVariableState>? variables;
|
||||
lock (Lock)
|
||||
{
|
||||
if (!_hostedVariables.TryGetValue(gobjectId, out variables))
|
||||
return;
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
foreach (var variable in variables)
|
||||
{
|
||||
variable.StatusCode = StatusCodes.Good;
|
||||
variable.Timestamp = now;
|
||||
variable.ClearChangeMasks(SystemContext, false);
|
||||
}
|
||||
}
|
||||
|
||||
Log.Information(
|
||||
"Cleared bad-quality override on {Count} variable(s) for recovered host gobject_id={GobjectId}",
|
||||
variables.Count, gobjectId);
|
||||
}
|
||||
|
||||
private void SubscribeAlarmTags()
|
||||
{
|
||||
foreach (var kvp in _alarmInAlarmTags)
|
||||
@@ -2116,6 +2279,14 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
if (_dispatchDisposed)
|
||||
return;
|
||||
|
||||
// Runtime status probes are bridge-owned subscriptions whose only job is to drive the
|
||||
// host state machine; they are NOT in _tagToVariableNode, so the normal dispatch path
|
||||
// would drop them anyway. Route probe addresses directly to the probe manager and skip
|
||||
// the dispatch queue entirely.
|
||||
if (_galaxyRuntimeProbeManager != null
|
||||
&& _galaxyRuntimeProbeManager.HandleProbeUpdate(address, vtq))
|
||||
return;
|
||||
|
||||
Interlocked.Increment(ref _totalMxChangeEvents);
|
||||
_pendingDataChanges[address] = vtq;
|
||||
try
|
||||
@@ -2162,6 +2333,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
if (!_dispatchRunning)
|
||||
break;
|
||||
|
||||
// Drive time-based probe state transitions on every dispatch tick. The dispatch
|
||||
// loop already wakes every 100ms via the WaitOne timeout, so this gives us a
|
||||
// ~10Hz cadence for the Unknown → Stopped timeout without introducing a new
|
||||
// thread or timer. No-op when the probe manager is disabled.
|
||||
_galaxyRuntimeProbeManager?.Tick();
|
||||
|
||||
var keys = _pendingDataChanges.Keys.ToList();
|
||||
if (keys.Count == 0)
|
||||
{
|
||||
@@ -2376,6 +2553,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
{
|
||||
_dispatchDisposed = true;
|
||||
_mxAccessClient.OnTagValueChanged -= OnMxAccessDataChange;
|
||||
// Dispose the runtime probe manager before the MxAccess client teardown so its
|
||||
// Unadvise calls reach a live client. Disposing the node manager normally runs
|
||||
// BEFORE the node manager's containing OpcUaServerHost releases the MxAccess
|
||||
// client, so the probes close cleanly.
|
||||
_galaxyRuntimeProbeManager?.Dispose();
|
||||
StopDispatchThread();
|
||||
_dataChangeSignal.Dispose();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user