Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Host/OpcUa/LmxNodeManager.cs
Joseph Doherty 3b2defd94f Phase 0 — mechanical rename ZB.MOM.WW.LmxOpcUa.* → ZB.MOM.WW.OtOpcUa.*
Renames all 11 projects (5 src + 6 tests), the .slnx solution file, all source-file namespaces, all axaml namespace references, and all v1 documentation references in CLAUDE.md and docs/*.md (excluding docs/v2/ which is already in OtOpcUa form). Also updates the TopShelf service registration name from "LmxOpcUa" to "OtOpcUa" per Phase 0 Task 0.6.

Preserves runtime identifiers per Phase 0 Out-of-Scope rules to avoid breaking v1/v2 client trust during coexistence: OPC UA `ApplicationUri` defaults (`urn:{GalaxyName}:LmxOpcUa`), server `EndpointPath` (`/LmxOpcUa`), `ServerName` default (feeds cert subject CN), `MxAccessConfiguration.ClientName` default (defensive — stays "LmxOpcUa" for MxAccess audit-trail consistency), client OPC UA identifiers (`ApplicationName = "LmxOpcUaClient"`, `ApplicationUri = "urn:localhost:LmxOpcUaClient"`, cert directory `%LocalAppData%\LmxOpcUaClient\pki\`), and the `LmxOpcUaServer` class name (class rename out of Phase 0 scope per Task 0.5 sed pattern; happens in Phase 1 alongside `LmxNodeManager → GenericDriverNodeManager` Core extraction). 23 LmxOpcUa references retained, all enumerated and justified in `docs/v2/implementation/exit-gate-phase-0.md`.

Build clean: 0 errors, 30 warnings (lower than baseline 167). Tests at strict improvement over baseline: 821 passing / 1 failing vs baseline 820 / 2 (one flaky pre-existing failure passed this run; the other still fails — both pre-existing and unrelated to the rename). `Client.UI.Tests`, `Historian.Aveva.Tests`, `Client.Shared.Tests`, `IntegrationTests` all match baseline exactly. Exit gate compliance results recorded in `docs/v2/implementation/exit-gate-phase-0.md` with all 7 checks PASS or DEFERRED-to-PR-review (#7 service install verification needs Windows service permissions on the reviewer's box).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 13:57:47 -04:00

2924 lines
135 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Server;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
using ZB.MOM.WW.OtOpcUa.Host.Historian;
using ZB.MOM.WW.OtOpcUa.Host.Metrics;
using ZB.MOM.WW.OtOpcUa.Host.MxAccess;
using ZB.MOM.WW.OtOpcUa.Host.Utilities;
namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa
{
/// <summary>
/// Custom node manager that builds the OPC UA address space from Galaxy hierarchy data.
/// (OPC-002 through OPC-013)
/// </summary>
public class LmxNodeManager : CustomNodeManager2
{
private static readonly ILogger Log = Serilog.Log.ForContext<LmxNodeManager>();
private readonly Dictionary<string, AlarmInfo> _alarmAckedTags = new(StringComparer.OrdinalIgnoreCase);
private readonly NodeId? _alarmAckRoleId;
// Alarm tracking: maps InAlarm tag reference → alarm source info
private readonly Dictionary<string, AlarmInfo> _alarmInAlarmTags = new(StringComparer.OrdinalIgnoreCase);
// Reverse lookups: priority/description tag reference → alarm info for cache updates
private readonly Dictionary<string, AlarmInfo> _alarmPriorityTags = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, AlarmInfo> _alarmDescTags = new(StringComparer.OrdinalIgnoreCase);
private readonly bool _alarmTrackingEnabled;
private readonly AlarmObjectFilter? _alarmObjectFilter;
private int _alarmFilterIncludedObjectCount;
private readonly bool _anonymousCanWrite;
// Host → list of OPC UA variable nodes transitively hosted by that host. Populated during
// BuildAddressSpace by walking each variable's owning object's hosted_by_gobject_id chain
// up to the nearest $WinPlatform or $AppEngine. A variable that lives under a nested host
// (e.g. a user object under an Engine under a Platform) appears in BOTH the Engine's and
// the Platform's list. Used by MarkHostVariablesBadQuality / ClearHostVariablesBadQuality
// when the galaxy runtime probe reports a host transition.
private readonly Dictionary<int, List<BaseDataVariableState>> _hostedVariables =
new Dictionary<int, List<BaseDataVariableState>>();
// Tag reference → list of owning host gobject_ids (typically Engine + Platform). Populated
// alongside _hostedVariables during BuildAddressSpace. Used by the Read path to short-circuit
// on-demand reads of tags under a Stopped runtime host — preventing MxAccess from returning
// stale "Good" cached values. Multiple tag refs on the same Galaxy object share the same
// host-id list by reference (safe because the list is read-only after build).
private readonly Dictionary<string, List<int>> _hostIdsByTagRef =
new Dictionary<string, List<int>>(StringComparer.OrdinalIgnoreCase);
// Runtime status probe manager — null when MxAccessConfiguration.RuntimeStatusProbesEnabled
// is false. Built at construction time and synced to the hierarchy on every BuildAddressSpace.
private readonly GalaxyRuntimeProbeManager? _galaxyRuntimeProbeManager;
// Queue of host runtime state transitions deferred from the probe callback (which runs on
// the MxAccess STA thread) to the dispatch thread, where the node manager Lock can be taken
// safely. Enqueue → signal dispatch → dispatch thread drains and calls Mark/Clear under Lock.
// Required because invoking Mark/Clear directly from the STA callback deadlocks against any
// worker thread currently inside Read waiting for an MxAccess round-trip.
private readonly ConcurrentQueue<(int GobjectId, bool Stopped)> _pendingHostStateChanges =
new ConcurrentQueue<(int, bool)>();
// Synthetic $-prefixed OPC UA child variables exposed under each $WinPlatform / $AppEngine
// object so clients can subscribe to runtime state changes without polling the dashboard.
// Populated during BuildAddressSpace and updated from the dispatch-thread queue drain
// alongside Mark/Clear, using the same deadlock-safe path.
private readonly Dictionary<int, HostRuntimeStatusNodes> _runtimeStatusNodes =
new Dictionary<int, HostRuntimeStatusNodes>();
private sealed class HostRuntimeStatusNodes
{
public BaseDataVariableState RuntimeState = null!;
public BaseDataVariableState LastCallbackTime = null!;
public BaseDataVariableState LastScanState = null!;
public BaseDataVariableState LastStateChangeTime = null!;
public BaseDataVariableState FailureCount = null!;
public BaseDataVariableState LastError = null!;
}
private readonly AutoResetEvent _dataChangeSignal = new(false);
private readonly Dictionary<int, List<string>> _gobjectToTagRefs = new();
private readonly HistoryContinuationPointManager _historyContinuations = new();
private readonly IHistorianDataSource? _historianDataSource;
private readonly PerformanceMetrics _metrics;
private readonly IMxAccessClient _mxAccessClient;
private readonly string _namespaceUri;
// NodeId → full_tag_reference for read/write resolution
private readonly Dictionary<string, string> _nodeIdToTagReference = new(StringComparer.OrdinalIgnoreCase);
// Incremental sync: persistent node map and reverse lookup
private readonly Dictionary<int, NodeState> _nodeMap = new();
// Data change dispatch queue: decouples MXAccess STA callbacks from OPC UA framework Lock
private readonly ConcurrentDictionary<string, Vtq> _pendingDataChanges = new(StringComparer.OrdinalIgnoreCase);
// Ref-counted MXAccess subscriptions
private readonly Dictionary<string, int> _subscriptionRefCounts = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, TagMetadata> _tagMetadata = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, BaseDataVariableState> _tagToVariableNode =
new(StringComparer.OrdinalIgnoreCase);
private readonly NodeId? _writeConfigureRoleId;
private readonly NodeId? _writeOperateRoleId;
private readonly NodeId? _writeTuneRoleId;
private readonly TimeSpan _mxAccessRequestTimeout;
private readonly TimeSpan _historianRequestTimeout;
private long _dispatchCycleCount;
private long _suppressedUpdatesCount;
private volatile bool _dispatchDisposed;
private volatile bool _dispatchRunning;
private Thread? _dispatchThread;
private IDictionary<NodeId, IList<IReference>>? _externalReferences;
private List<GalaxyAttributeInfo>? _lastAttributes;
private List<GalaxyObjectInfo>? _lastHierarchy;
private DateTime _lastMetricsReportTime = DateTime.UtcNow;
private long _lastReportedMxChangeEvents;
private long _totalDispatchBatchSize;
// Dispatch queue metrics
private long _totalMxChangeEvents;
// Alarm instrumentation counters
private long _alarmTransitionCount;
private long _alarmAckEventCount;
private long _alarmAckWriteFailures;
// Background subscribe tracking: every fire-and-forget SubscribeAsync for alarm auto-subscribe
// and transferred-subscription restore is registered here so shutdown can drain pending work
// with a bounded timeout, and so tests can observe pending count without races.
private readonly ConcurrentDictionary<long, Task> _pendingBackgroundSubscribes =
new ConcurrentDictionary<long, Task>();
private long _backgroundSubscribeCounter;
/// <summary>
/// Initializes a new node manager for the Galaxy-backed OPC UA namespace.
/// </summary>
/// <param name="server">The hosting OPC UA server internals.</param>
/// <param name="configuration">The OPC UA application configuration for the host.</param>
/// <param name="namespaceUri">The namespace URI that identifies the Galaxy model to clients.</param>
/// <param name="mxAccessClient">The runtime client used to service reads, writes, and subscriptions.</param>
/// <param name="metrics">The metrics collector used to track node manager activity.</param>
/// <param name="historianDataSource">The optional historian adapter used to satisfy OPC UA history read requests.</param>
/// <param name="alarmTrackingEnabled">Enables alarm-condition state generation for Galaxy attributes modeled as alarms.</param>
/// <param name="alarmObjectFilter">Optional template-based object filter. When supplied and enabled, only Galaxy
/// objects whose template derivation chain matches a pattern (and their descendants) contribute alarm conditions.
/// A <see langword="null"/> or disabled filter preserves the current unfiltered behavior.</param>
public LmxNodeManager(
IServerInternal server,
ApplicationConfiguration configuration,
string namespaceUri,
IMxAccessClient mxAccessClient,
PerformanceMetrics metrics,
IHistorianDataSource? historianDataSource = null,
bool alarmTrackingEnabled = false,
bool anonymousCanWrite = true,
NodeId? writeOperateRoleId = null,
NodeId? writeTuneRoleId = null,
NodeId? writeConfigureRoleId = null,
NodeId? alarmAckRoleId = null,
AlarmObjectFilter? alarmObjectFilter = null,
bool runtimeStatusProbesEnabled = false,
int runtimeStatusUnknownTimeoutSeconds = 15,
int mxAccessRequestTimeoutSeconds = 30,
int historianRequestTimeoutSeconds = 60)
: base(server, configuration, namespaceUri)
{
_namespaceUri = namespaceUri;
_mxAccessClient = mxAccessClient;
_metrics = metrics;
_historianDataSource = historianDataSource;
_alarmTrackingEnabled = alarmTrackingEnabled;
_alarmObjectFilter = alarmObjectFilter;
_anonymousCanWrite = anonymousCanWrite;
_writeOperateRoleId = writeOperateRoleId;
_writeTuneRoleId = writeTuneRoleId;
_writeConfigureRoleId = writeConfigureRoleId;
_alarmAckRoleId = alarmAckRoleId;
_mxAccessRequestTimeout = TimeSpan.FromSeconds(Math.Max(1, mxAccessRequestTimeoutSeconds));
_historianRequestTimeout = TimeSpan.FromSeconds(Math.Max(1, historianRequestTimeoutSeconds));
if (runtimeStatusProbesEnabled)
{
// Probe transition callbacks are deferred through a concurrent queue onto the
// dispatch thread — they cannot run synchronously from the STA callback thread
// because MarkHostVariablesBadQuality needs the node manager Lock, which may be
// held by a worker thread waiting on an MxAccess round-trip.
_galaxyRuntimeProbeManager = new GalaxyRuntimeProbeManager(
_mxAccessClient,
runtimeStatusUnknownTimeoutSeconds,
gobjectId =>
{
_pendingHostStateChanges.Enqueue((gobjectId, true));
try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { }
},
gobjectId =>
{
_pendingHostStateChanges.Enqueue((gobjectId, false));
try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { }
});
}
// Wire up data change delivery
_mxAccessClient.OnTagValueChanged += OnMxAccessDataChange;
// Start background dispatch thread
StartDispatchThread();
}
/// <summary>
/// Gets the mapping from OPC UA node identifiers to the Galaxy tag references used for runtime I/O.
/// </summary>
public IReadOnlyDictionary<string, string> NodeIdToTagReference => _nodeIdToTagReference;
/// <summary>
/// Gets the number of variable nodes currently published from Galaxy attributes.
/// </summary>
public int VariableNodeCount { get; private set; }
/// <summary>
/// Gets the number of non-area object nodes currently published from the Galaxy hierarchy.
/// </summary>
public int ObjectNodeCount { get; private set; }
/// <summary>
/// Gets the total number of MXAccess data change events received since startup.
/// </summary>
public long TotalMxChangeEvents => Interlocked.Read(ref _totalMxChangeEvents);
/// <summary>
/// Gets the number of items currently waiting in the dispatch queue.
/// </summary>
public int PendingDataChangeCount => _pendingDataChanges.Count;
/// <summary>
/// Gets the most recently computed MXAccess data change events per second.
/// </summary>
public double MxChangeEventsPerSecond { get; private set; }
/// <summary>
/// Gets the most recently computed average dispatch batch size (proxy for queue depth under load).
/// </summary>
public double AverageDispatchBatchSize { get; private set; }
/// <summary>
/// Gets a value indicating whether alarm condition tracking is enabled for this node manager.
/// </summary>
public bool AlarmTrackingEnabled => _alarmTrackingEnabled;
/// <summary>
/// Gets a value indicating whether the template-based alarm object filter is enabled.
/// </summary>
public bool AlarmFilterEnabled => _alarmObjectFilter?.Enabled ?? false;
/// <summary>
/// Gets the number of compiled alarm filter patterns.
/// </summary>
public int AlarmFilterPatternCount => _alarmObjectFilter?.PatternCount ?? 0;
/// <summary>
/// Gets the number of Galaxy objects included by the alarm filter during the most recent address-space build.
/// </summary>
public int AlarmFilterIncludedObjectCount => _alarmFilterIncludedObjectCount;
/// <summary>
/// Gets the raw alarm filter patterns exactly as configured, for display on the status dashboard.
/// Returns an empty list when no filter is active.
/// </summary>
public IReadOnlyList<string> AlarmFilterPatterns =>
_alarmObjectFilter?.RawPatterns ?? Array.Empty<string>();
/// <summary>
/// Gets a snapshot of the runtime host states (Platforms + AppEngines). Returns an empty
/// list when runtime status probing is disabled. The snapshot respects MxAccess transport
/// state — when the client is disconnected, every entry is returned as
/// <see cref="GalaxyRuntimeState.Unknown"/>.
/// </summary>
public IReadOnlyList<GalaxyRuntimeStatus> RuntimeStatuses =>
_galaxyRuntimeProbeManager?.GetSnapshot() ?? (IReadOnlyList<GalaxyRuntimeStatus>)Array.Empty<GalaxyRuntimeStatus>();
/// <summary>
/// Gets the number of bridge-owned runtime status probe subscriptions. Surfaced on the
/// dashboard Subscriptions panel to distinguish probe overhead from client subscriptions.
/// </summary>
public int ActiveRuntimeProbeCount => _galaxyRuntimeProbeManager?.ActiveProbeCount ?? 0;
/// <summary>
/// Gets the runtime historian health snapshot, or <see langword="null"/> when the historian
/// plugin is not loaded. Surfaced on the status dashboard so operators can detect query
/// failures that the load-time plugin status cannot catch.
/// </summary>
public HistorianHealthSnapshot? HistorianHealth => _historianDataSource?.GetHealthSnapshot();
/// <summary>
/// Gets the number of distinct alarm conditions currently tracked (one per alarm attribute).
/// </summary>
public int AlarmConditionCount => _alarmInAlarmTags.Count;
/// <summary>
/// Gets the number of alarms currently in the InAlarm=true state.
/// </summary>
public int ActiveAlarmCount => CountActiveAlarms();
/// <summary>
/// Gets the total number of InAlarm transition events observed in the dispatch loop since startup.
/// </summary>
public long AlarmTransitionCount => Interlocked.Read(ref _alarmTransitionCount);
/// <summary>
/// Gets the total number of alarm acknowledgement transition events observed since startup.
/// </summary>
public long AlarmAckEventCount => Interlocked.Read(ref _alarmAckEventCount);
/// <summary>
/// Gets the total number of MXAccess AckMsg writes that failed while processing alarm acknowledges.
/// </summary>
public long AlarmAckWriteFailures => Interlocked.Read(ref _alarmAckWriteFailures);
private int CountActiveAlarms()
{
var count = 0;
lock (Lock)
{
foreach (var info in _alarmInAlarmTags.Values)
if (info.LastInAlarm) count++;
}
return count;
}
/// <inheritdoc />
public override void CreateAddressSpace(IDictionary<NodeId, IList<IReference>> externalReferences)
{
lock (Lock)
{
_externalReferences = externalReferences;
base.CreateAddressSpace(externalReferences);
}
}
/// <summary>
/// Builds the address space from Galaxy hierarchy and attributes data. (OPC-002, OPC-003)
/// </summary>
/// <param name="hierarchy">The Galaxy object hierarchy that defines folders and objects in the namespace.</param>
/// <param name="attributes">The Galaxy attributes that become OPC UA variable nodes.</param>
public void BuildAddressSpace(List<GalaxyObjectInfo> hierarchy, List<GalaxyAttributeInfo> attributes)
{
lock (Lock)
{
_nodeIdToTagReference.Clear();
_tagToVariableNode.Clear();
_tagMetadata.Clear();
_alarmInAlarmTags.Clear();
_alarmAckedTags.Clear();
_alarmPriorityTags.Clear();
_alarmDescTags.Clear();
_nodeMap.Clear();
_gobjectToTagRefs.Clear();
_hostedVariables.Clear();
_hostIdsByTagRef.Clear();
_runtimeStatusNodes.Clear();
VariableNodeCount = 0;
ObjectNodeCount = 0;
// Topological sort: ensure parents appear before children regardless of input order
var sorted = TopologicalSort(hierarchy);
// Build lookup: gobject_id → list of attributes
var attrsByObject = attributes
.GroupBy(a => a.GobjectId)
.ToDictionary(g => g.Key, g => g.ToList());
// Root folder — enable events so alarm events propagate to clients subscribed at root
var rootFolder = CreateFolder(null, "ZB", "ZB");
rootFolder.NodeId = new NodeId("ZB", NamespaceIndex);
rootFolder.EventNotifier = EventNotifiers.SubscribeToEvents;
rootFolder.AddReference(ReferenceTypeIds.Organizes, true, ObjectIds.ObjectsFolder);
AddPredefinedNode(SystemContext, rootFolder);
// Add reverse reference from Objects folder → ZB root.
// BuildAddressSpace runs after CreateAddressSpace completes, so the
// externalReferences dict has already been consumed by the core node manager.
// Use MasterNodeManager.AddReferences to route the reference correctly.
Server.NodeManager.AddReferences(ObjectIds.ObjectsFolder, new List<IReference>
{
new NodeStateReference(ReferenceTypeIds.Organizes, false, rootFolder.NodeId)
});
// Create nodes for each object in hierarchy
foreach (var obj in sorted)
{
NodeState parentNode;
if (_nodeMap.TryGetValue(obj.ParentGobjectId, out var p))
parentNode = p;
else
parentNode = rootFolder;
NodeState node;
if (obj.IsArea)
{
// Areas → FolderType + Organizes reference
var folder = CreateFolder(parentNode, obj.BrowseName, obj.BrowseName);
folder.NodeId = new NodeId(obj.TagName, NamespaceIndex);
node = folder;
}
else
{
// Non-areas → BaseObjectType + HasComponent reference
var objNode = CreateObject(parentNode, obj.BrowseName, obj.BrowseName);
objNode.NodeId = new NodeId(obj.TagName, NamespaceIndex);
node = objNode;
ObjectNodeCount++;
}
AddPredefinedNode(SystemContext, node);
_nodeMap[obj.GobjectId] = node;
// Attach bridge-owned $RuntimeState / $LastCallbackTime / ... synthetic child
// variables so OPC UA clients can subscribe to host state changes without
// polling the dashboard. Only $WinPlatform (1) and $AppEngine (3) get them.
if (_galaxyRuntimeProbeManager != null
&& (obj.CategoryId == 1 || obj.CategoryId == 3)
&& node is BaseObjectState hostObj)
{
_runtimeStatusNodes[obj.GobjectId] =
CreateHostRuntimeStatusNodes(hostObj, obj.TagName);
}
// Create variable nodes for this object's attributes
if (attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs))
{
// Group by primitive_name: empty = direct child, non-empty = sub-object
var byPrimitive = objAttrs
.GroupBy(a => a.PrimitiveName ?? "")
.OrderBy(g => g.Key);
// Collect primitive group names so we know which direct attributes have children
var primitiveGroupNames = new HashSet<string>(
byPrimitive.Select(g => g.Key).Where(k => !string.IsNullOrEmpty(k)),
StringComparer.OrdinalIgnoreCase);
// Track variable nodes created for direct attributes that also have primitive children
var variableNodes =
new Dictionary<string, BaseDataVariableState>(StringComparer.OrdinalIgnoreCase);
// First pass: create direct (root-level) attribute variables
var directGroup = byPrimitive.FirstOrDefault(g => string.IsNullOrEmpty(g.Key));
if (directGroup != null)
foreach (var attr in directGroup)
{
var variable = CreateAttributeVariable(node, attr);
if (primitiveGroupNames.Contains(attr.AttributeName))
variableNodes[attr.AttributeName] = variable;
}
// Second pass: add primitive child attributes under the matching variable node
foreach (var group in byPrimitive)
{
if (string.IsNullOrEmpty(group.Key))
continue;
NodeState parentForAttrs;
if (variableNodes.TryGetValue(group.Key, out var existingVariable))
{
// Merge: use the existing variable node as parent
parentForAttrs = existingVariable;
}
else
{
// No matching dynamic attribute — create an object node
var primNode = CreateObject(node, group.Key, group.Key);
primNode.NodeId = new NodeId(obj.TagName + "." + group.Key, NamespaceIndex);
AddPredefinedNode(SystemContext, primNode);
parentForAttrs = primNode;
}
foreach (var attr in group) CreateAttributeVariable(parentForAttrs, attr);
}
}
}
// Build alarm tracking: create AlarmConditionState for each alarm attribute
if (_alarmTrackingEnabled)
{
var includedIds = ResolveAlarmFilterIncludedIds(sorted);
foreach (var obj in sorted)
{
if (obj.IsArea) continue;
if (includedIds != null && !includedIds.Contains(obj.GobjectId)) continue;
if (!attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) continue;
var hasAlarms = false;
var alarmAttrs = objAttrs.Where(a => a.IsAlarm && string.IsNullOrEmpty(a.PrimitiveName))
.ToList();
foreach (var alarmAttr in alarmAttrs)
{
var inAlarmTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']') + ".InAlarm";
if (!_tagToVariableNode.ContainsKey(inAlarmTagRef))
continue;
var alarmNodeIdStr = alarmAttr.FullTagReference.EndsWith("[]")
? alarmAttr.FullTagReference.Substring(0, alarmAttr.FullTagReference.Length - 2)
: alarmAttr.FullTagReference;
// Find the source variable node for the alarm
_tagToVariableNode.TryGetValue(alarmAttr.FullTagReference, out var sourceVariable);
var sourceNodeId = new NodeId(alarmNodeIdStr, NamespaceIndex);
// Create AlarmConditionState attached to the source variable
var conditionNodeId = new NodeId(alarmNodeIdStr + ".Condition", NamespaceIndex);
var condition = new AlarmConditionState(sourceVariable);
condition.Create(SystemContext, conditionNodeId,
new QualifiedName(alarmAttr.AttributeName + "Alarm", NamespaceIndex),
new LocalizedText("en", alarmAttr.AttributeName + " Alarm"),
true);
condition.SourceNode.Value = sourceNodeId;
condition.SourceName.Value = alarmAttr.FullTagReference.TrimEnd('[', ']');
condition.ConditionName.Value = alarmAttr.AttributeName;
condition.AutoReportStateChanges = true;
// Set initial state: enabled, inactive, acknowledged
condition.SetEnableState(SystemContext, true);
condition.SetActiveState(SystemContext, false);
condition.SetAcknowledgedState(SystemContext, true);
condition.SetSeverity(SystemContext, EventSeverity.Medium);
condition.Retain.Value = false;
condition.OnReportEvent = (context, node, e) => Server.ReportEvent(context, e);
condition.OnAcknowledge = OnAlarmAcknowledge;
condition.OnConfirm = OnAlarmConfirm;
condition.OnAddComment = OnAlarmAddComment;
condition.OnEnableDisable = OnAlarmEnableDisable;
condition.OnShelve = OnAlarmShelve;
condition.OnTimedUnshelve = OnAlarmTimedUnshelve;
// Add HasCondition reference from source to condition
if (sourceVariable != null)
{
sourceVariable.AddReference(ReferenceTypeIds.HasCondition, false, conditionNodeId);
condition.AddReference(ReferenceTypeIds.HasCondition, true, sourceNodeId);
}
AddPredefinedNode(SystemContext, condition);
var baseTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']');
var alarmInfo = new AlarmInfo
{
SourceTagReference = alarmAttr.FullTagReference,
SourceNodeId = sourceNodeId,
SourceName = alarmAttr.AttributeName,
ConditionNode = condition,
PriorityTagReference = baseTagRef + ".Priority",
DescAttrNameTagReference = baseTagRef + ".DescAttrName",
AckedTagReference = baseTagRef + ".Acked",
AckMsgTagReference = baseTagRef + ".AckMsg"
};
_alarmInAlarmTags[inAlarmTagRef] = alarmInfo;
_alarmAckedTags[alarmInfo.AckedTagReference] = alarmInfo;
if (!string.IsNullOrEmpty(alarmInfo.PriorityTagReference))
_alarmPriorityTags[alarmInfo.PriorityTagReference] = alarmInfo;
if (!string.IsNullOrEmpty(alarmInfo.DescAttrNameTagReference))
_alarmDescTags[alarmInfo.DescAttrNameTagReference] = alarmInfo;
hasAlarms = true;
}
// Enable EventNotifier on this node and all ancestors so alarm events propagate
if (hasAlarms && _nodeMap.TryGetValue(obj.GobjectId, out var objNode))
EnableEventNotifierUpChain(objNode);
}
}
// Auto-subscribe to InAlarm tags so we detect alarm transitions
if (_alarmTrackingEnabled)
SubscribeAlarmTags();
BuildHostedVariablesMap(hierarchy);
// Sync the galaxy runtime probe set against the rebuilt hierarchy. This runs
// synchronously on the calling thread and issues AdviseSupervisory per host —
// expected 500ms-1s additional startup latency for a large multi-host galaxy.
// Bounded by _mxAccessRequestTimeout so a hung probe sync cannot park the address
// space rebuild indefinitely; on timeout we log a warning and continue with the
// partial probe set (probe sync is advisory, not required for address space correctness).
if (_galaxyRuntimeProbeManager != null)
{
try
{
SyncOverAsync.WaitSync(
_galaxyRuntimeProbeManager.SyncAsync(hierarchy),
_mxAccessRequestTimeout,
"GalaxyRuntimeProbeManager.SyncAsync");
}
catch (TimeoutException ex)
{
Log.Warning(ex, "Runtime probe sync exceeded {Timeout}s; continuing with partial probe set",
_mxAccessRequestTimeout.TotalSeconds);
}
}
_lastHierarchy = new List<GalaxyObjectInfo>(hierarchy);
_lastAttributes = new List<GalaxyAttributeInfo>(attributes);
Log.Information(
"Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references, {Alarms} alarm tags, {Hosts} runtime hosts",
ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count, _alarmInAlarmTags.Count,
_hostedVariables.Count);
}
}
/// <summary>
/// Resolves the alarm object filter against the given hierarchy, updates the published include count,
/// emits a one-line summary log when the filter is active, and warns about patterns that matched nothing.
/// Returns <see langword="null"/> when no filter is configured so the alarm loop continues unfiltered.
/// </summary>
private HashSet<int>? ResolveAlarmFilterIncludedIds(IReadOnlyList<GalaxyObjectInfo> sorted)
{
if (_alarmObjectFilter == null || !_alarmObjectFilter.Enabled)
{
_alarmFilterIncludedObjectCount = 0;
return null;
}
var includedIds = _alarmObjectFilter.ResolveIncludedObjects(sorted);
_alarmFilterIncludedObjectCount = includedIds?.Count ?? 0;
Log.Information(
"Alarm filter: {IncludedCount} of {TotalCount} objects included ({PatternCount} pattern(s))",
_alarmFilterIncludedObjectCount, sorted.Count, _alarmObjectFilter.PatternCount);
foreach (var unmatched in _alarmObjectFilter.UnmatchedPatterns)
Log.Warning("Alarm filter pattern matched zero objects: {Pattern}", unmatched);
return includedIds;
}
/// <summary>
/// Builds the <c>_hostedVariables</c> dictionary from the completed address space. For each
/// Galaxy object, walks its <c>HostedByGobjectId</c> chain up to the nearest <c>$WinPlatform</c>
/// or <c>$AppEngine</c> and appends every variable the object owns to that host's list. An
/// object under an Engine under a Platform appears in BOTH lists so stopping the Platform
/// invalidates every descendant Engine's variables as well.
/// </summary>
private void BuildHostedVariablesMap(List<GalaxyObjectInfo> hierarchy)
{
_hostedVariables.Clear();
_hostIdsByTagRef.Clear();
if (hierarchy == null || hierarchy.Count == 0)
return;
var byId = new Dictionary<int, GalaxyObjectInfo>(hierarchy.Count);
foreach (var obj in hierarchy)
byId[obj.GobjectId] = obj;
foreach (var obj in hierarchy)
{
if (!_gobjectToTagRefs.TryGetValue(obj.GobjectId, out var tagRefs) || tagRefs.Count == 0)
continue;
// Collect every variable node owned by this object from the tag→variable map.
var ownedVariables = new List<BaseDataVariableState>(tagRefs.Count);
foreach (var tagRef in tagRefs)
if (_tagToVariableNode.TryGetValue(tagRef, out var v))
ownedVariables.Add(v);
if (ownedVariables.Count == 0)
continue;
// Walk HostedByGobjectId up the chain, collecting every Platform/Engine encountered.
// Visited set defends against cycles in misconfigured galaxies. Every tag ref owned
// by this object shares the same ancestorHosts list by reference.
var ancestorHosts = new List<int>();
var visited = new HashSet<int>();
var cursor = obj;
var depth = 0;
while (cursor != null && depth < 32 && visited.Add(cursor.GobjectId))
{
if (cursor.CategoryId == 1 || cursor.CategoryId == 3)
ancestorHosts.Add(cursor.GobjectId);
if (cursor.HostedByGobjectId == 0 ||
!byId.TryGetValue(cursor.HostedByGobjectId, out var next))
break;
cursor = next;
depth++;
}
if (ancestorHosts.Count == 0)
continue;
// Append this object's variables to each host's hosted-variables list.
foreach (var hostId in ancestorHosts)
{
if (!_hostedVariables.TryGetValue(hostId, out var list))
{
list = new List<BaseDataVariableState>();
_hostedVariables[hostId] = list;
}
list.AddRange(ownedVariables);
}
// Register reverse lookup for the Read-path short-circuit.
foreach (var tagRef in tagRefs)
_hostIdsByTagRef[tagRef] = ancestorHosts;
}
}
/// <summary>
/// Flips every OPC UA variable hosted by the given Galaxy runtime object (Platform or
/// AppEngine) to <see cref="StatusCodes.BadOutOfService"/>. Invoked by the runtime probe
/// manager's Running → Stopped callback. Safe to call with an unknown gobject id — no-op.
/// </summary>
/// <param name="gobjectId">The runtime host's gobject_id.</param>
public void MarkHostVariablesBadQuality(int gobjectId)
{
List<BaseDataVariableState>? variables;
lock (Lock)
{
if (!_hostedVariables.TryGetValue(gobjectId, out variables))
return;
var now = DateTime.UtcNow;
foreach (var variable in variables)
{
variable.StatusCode = StatusCodes.BadOutOfService;
variable.Timestamp = now;
variable.ClearChangeMasks(SystemContext, false);
}
}
Log.Information(
"Marked {Count} variable(s) BadOutOfService for stopped host gobject_id={GobjectId}",
variables.Count, gobjectId);
}
/// <summary>
/// Creates the six <c>$</c>-prefixed synthetic child variables on a host object so OPC UA
/// clients can subscribe to runtime state changes without polling the dashboard. All
/// nodes are read-only and their values are refreshed by <see cref="UpdateHostRuntimeStatusNodes"/>
/// from the dispatch-thread queue drain whenever the host transitions.
/// </summary>
private HostRuntimeStatusNodes CreateHostRuntimeStatusNodes(BaseObjectState hostNode, string hostTagName)
{
var nodes = new HostRuntimeStatusNodes
{
RuntimeState = CreateSyntheticVariable(hostNode, hostTagName, "$RuntimeState", DataTypeIds.String, "Unknown"),
LastCallbackTime = CreateSyntheticVariable(hostNode, hostTagName, "$LastCallbackTime", DataTypeIds.DateTime, DateTime.MinValue),
LastScanState = CreateSyntheticVariable(hostNode, hostTagName, "$LastScanState", DataTypeIds.Boolean, false),
LastStateChangeTime = CreateSyntheticVariable(hostNode, hostTagName, "$LastStateChangeTime", DataTypeIds.DateTime, DateTime.MinValue),
FailureCount = CreateSyntheticVariable(hostNode, hostTagName, "$FailureCount", DataTypeIds.Int64, 0L),
LastError = CreateSyntheticVariable(hostNode, hostTagName, "$LastError", DataTypeIds.String, "")
};
return nodes;
}
private BaseDataVariableState CreateSyntheticVariable(
BaseObjectState parent, string parentTagName, string browseName, NodeId dataType, object initialValue)
{
var v = CreateVariable(parent, browseName, browseName, dataType, ValueRanks.Scalar);
v.NodeId = new NodeId(parentTagName + "." + browseName, NamespaceIndex);
v.Value = initialValue;
v.StatusCode = StatusCodes.Good;
v.Timestamp = DateTime.UtcNow;
v.AccessLevel = AccessLevels.CurrentRead;
v.UserAccessLevel = AccessLevels.CurrentRead;
AddPredefinedNode(SystemContext, v);
return v;
}
/// <summary>
/// Refreshes the six synthetic child variables on a host from the probe manager's
/// current snapshot for that host. Called from the dispatch-thread queue drain after
/// Mark/Clear so the state values propagate to subscribed clients in the same publish
/// cycle. Takes the node manager <see cref="Lock"/> internally.
/// </summary>
private void UpdateHostRuntimeStatusNodes(int gobjectId)
{
if (_galaxyRuntimeProbeManager == null)
return;
HostRuntimeStatusNodes? nodes;
lock (Lock)
{
if (!_runtimeStatusNodes.TryGetValue(gobjectId, out nodes))
return;
}
var status = _galaxyRuntimeProbeManager.GetHostStatus(gobjectId);
if (status == null)
return;
lock (Lock)
{
var now = DateTime.UtcNow;
SetSynthetic(nodes.RuntimeState, status.State.ToString(), now);
SetSynthetic(nodes.LastCallbackTime, status.LastStateCallbackTime ?? DateTime.MinValue, now);
SetSynthetic(nodes.LastScanState, status.LastScanState ?? false, now);
SetSynthetic(nodes.LastStateChangeTime, status.LastStateChangeTime ?? DateTime.MinValue, now);
SetSynthetic(nodes.FailureCount, status.FailureCount, now);
SetSynthetic(nodes.LastError, status.LastError ?? "", now);
}
}
private void SetSynthetic(BaseDataVariableState variable, object value, DateTime now)
{
variable.Value = value;
variable.StatusCode = StatusCodes.Good;
variable.Timestamp = now;
variable.ClearChangeMasks(SystemContext, false);
}
/// <summary>
/// Resets every OPC UA variable hosted by the given Galaxy runtime object to
/// <see cref="StatusCodes.Good"/>. Invoked by the runtime probe manager's Stopped → Running
/// callback. Values are left as-is; subsequent MxAccess on-change updates will refresh them
/// as tags change naturally.
/// </summary>
/// <param name="gobjectId">The runtime host's gobject_id.</param>
public void ClearHostVariablesBadQuality(int gobjectId)
{
var clearedCount = 0;
var skippedCount = 0;
lock (Lock)
{
var now = DateTime.UtcNow;
// Iterate the full tag → host-list map so we can skip variables whose other
// ancestor hosts are still Stopped. Mass-clearing _hostedVariables[gobjectId]
// would wipe Bad status set by a concurrently-stopped sibling host (e.g.
// recovering DevPlatform must not clear variables that also live under a
// still-stopped DevAppEngine).
foreach (var kv in _hostIdsByTagRef)
{
var hostIds = kv.Value;
if (!hostIds.Contains(gobjectId))
continue;
var anotherStopped = false;
for (var i = 0; i < hostIds.Count; i++)
{
if (hostIds[i] == gobjectId)
continue;
if (_galaxyRuntimeProbeManager != null &&
_galaxyRuntimeProbeManager.IsHostStopped(hostIds[i]))
{
anotherStopped = true;
break;
}
}
if (anotherStopped)
{
skippedCount++;
continue;
}
if (_tagToVariableNode.TryGetValue(kv.Key, out var variable))
{
variable.StatusCode = StatusCodes.Good;
variable.Timestamp = now;
variable.ClearChangeMasks(SystemContext, false);
clearedCount++;
}
}
}
Log.Information(
"Cleared bad-quality override on {Count} variable(s) for recovered host gobject_id={GobjectId} (skipped {Skipped} with other stopped ancestors)",
clearedCount, gobjectId, skippedCount);
}
private void SubscribeAlarmTags()
{
foreach (var kvp in _alarmInAlarmTags)
{
// Subscribe to InAlarm, Priority, and DescAttrName for each alarm
var tagsToSubscribe = new[]
{
kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference,
kvp.Value.AckedTagReference
};
foreach (var tag in tagsToSubscribe)
{
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
continue;
TrackBackgroundSubscribe(tag, "alarm auto-subscribe");
}
}
}
/// <summary>
/// Issues a fire-and-forget <c>SubscribeAsync</c> for <paramref name="tag"/> and registers
/// the resulting task so shutdown can drain pending work with a bounded timeout. The
/// continuation both removes the completed entry and logs faults with the supplied
/// <paramref name="context"/>.
/// </summary>
private void TrackBackgroundSubscribe(string tag, string context)
{
if (_dispatchDisposed)
return;
var id = Interlocked.Increment(ref _backgroundSubscribeCounter);
var task = _mxAccessClient.SubscribeAsync(tag, (_, _) => { });
_pendingBackgroundSubscribes[id] = task;
task.ContinueWith(t =>
{
_pendingBackgroundSubscribes.TryRemove(id, out _);
if (t.IsFaulted)
Log.Warning(t.Exception?.InnerException, "Background subscribe failed ({Context}) for {Tag}",
context, tag);
}, TaskContinuationOptions.ExecuteSynchronously);
}
/// <summary>
/// Gets the number of background subscribe tasks currently in flight. Exposed for tests
/// and for the status dashboard subscription panel.
/// </summary>
internal int PendingBackgroundSubscribeCount => _pendingBackgroundSubscribes.Count;
private ServiceResult OnAlarmAcknowledge(
ISystemContext context, ConditionState condition, byte[] eventId, LocalizedText comment)
{
if (!HasAlarmAckPermission(context))
return new ServiceResult(StatusCodes.BadUserAccessDenied);
var alarmInfo = _alarmInAlarmTags.Values
.FirstOrDefault(a => a.ConditionNode == condition);
if (alarmInfo == null)
return new ServiceResult(StatusCodes.BadNodeIdUnknown);
using var scope = _metrics.BeginOperation("AlarmAcknowledge");
try
{
var ackMessage = comment?.Text ?? "";
_mxAccessClient.WriteAsync(alarmInfo.AckMsgTagReference, ackMessage)
.GetAwaiter().GetResult();
Log.Information("Alarm acknowledge sent: {Source} (Message={AckMsg})",
alarmInfo.SourceName, ackMessage);
return ServiceResult.Good;
}
catch (Exception ex)
{
scope.SetSuccess(false);
Interlocked.Increment(ref _alarmAckWriteFailures);
Log.Warning(ex, "Failed to write AckMsg for {Source}", alarmInfo.SourceName);
return new ServiceResult(StatusCodes.BadInternalError);
}
}
private ServiceResult OnAlarmConfirm(
ISystemContext context, ConditionState condition, byte[] eventId, LocalizedText comment)
{
Log.Information("Alarm confirmed: {Name} (Comment={Comment})",
condition.ConditionName?.Value, comment?.Text);
return ServiceResult.Good;
}
private ServiceResult OnAlarmAddComment(
ISystemContext context, ConditionState condition, byte[] eventId, LocalizedText comment)
{
Log.Information("Alarm comment added: {Name} — {Comment}",
condition.ConditionName?.Value, comment?.Text);
return ServiceResult.Good;
}
private ServiceResult OnAlarmEnableDisable(
ISystemContext context, ConditionState condition, bool enabling)
{
Log.Information("Alarm {Action}: {Name}",
enabling ? "ENABLED" : "DISABLED", condition.ConditionName?.Value);
return ServiceResult.Good;
}
private ServiceResult OnAlarmShelve(
ISystemContext context, AlarmConditionState alarm, bool shelving, bool oneShot, double shelvingTime)
{
alarm.SetShelvingState(context, shelving, oneShot, shelvingTime);
Log.Information("Alarm {Action}: {Name} (OneShot={OneShot}, Time={Time}s)",
shelving ? "SHELVED" : "UNSHELVED", alarm.ConditionName?.Value, oneShot,
shelvingTime / 1000.0);
return ServiceResult.Good;
}
private ServiceResult OnAlarmTimedUnshelve(
ISystemContext context, AlarmConditionState alarm)
{
alarm.SetShelvingState(context, false, false, 0);
Log.Information("Alarm timed unshelve: {Name}", alarm.ConditionName?.Value);
return ServiceResult.Good;
}
private void ReportAlarmEvent(AlarmInfo info, bool active)
{
var condition = info.ConditionNode;
if (condition == null)
return;
var severity = info.CachedSeverity;
var message = active
? !string.IsNullOrEmpty(info.CachedMessage) ? info.CachedMessage : $"Alarm active: {info.SourceName}"
: $"Alarm cleared: {info.SourceName}";
// Set a new EventId so clients can reference this event for acknowledge
condition.EventId.Value = Guid.NewGuid().ToByteArray();
condition.SetActiveState(SystemContext, active);
condition.Message.Value = new LocalizedText("en", message);
condition.SetSeverity(SystemContext, (EventSeverity)severity);
// Populate additional event fields
if (condition.LocalTime != null)
condition.LocalTime.Value = new TimeZoneDataType
{
Offset = (short)TimeZoneInfo.Local.BaseUtcOffset.TotalMinutes,
DaylightSavingInOffset = TimeZoneInfo.Local.IsDaylightSavingTime(DateTime.Now)
};
if (condition.Quality != null)
condition.Quality.Value = StatusCodes.Good;
// Retain while active or unacknowledged
condition.Retain.Value = active || condition.AckedState?.Id?.Value == false;
// Reset acknowledged state when alarm activates
if (active)
condition.SetAcknowledgedState(SystemContext, false);
// Walk up the notifier chain so events reach subscribers at any ancestor level
if (_tagToVariableNode.TryGetValue(info.SourceTagReference, out var sourceVar))
ReportEventUpNotifierChain(sourceVar, condition);
Log.Information("Alarm {State}: {Source} (Severity={Severity}, Message={Message})",
active ? "ACTIVE" : "CLEARED", info.SourceName, severity, message);
}
/// <summary>
/// Rebuilds the address space, removing old nodes and creating new ones. (OPC-010)
/// </summary>
/// <param name="hierarchy">The latest Galaxy object hierarchy to publish.</param>
/// <param name="attributes">The latest Galaxy attributes to publish.</param>
public void RebuildAddressSpace(List<GalaxyObjectInfo> hierarchy, List<GalaxyAttributeInfo> attributes)
{
SyncAddressSpace(hierarchy, attributes);
}
/// <summary>
/// Incrementally syncs the address space by detecting changed gobjects and rebuilding only those subtrees. (OPC-010)
/// </summary>
/// <param name="hierarchy">The latest Galaxy object hierarchy snapshot to compare against the currently published model.</param>
/// <param name="attributes">The latest Galaxy attribute snapshot to compare against the currently published variables.</param>
public void SyncAddressSpace(List<GalaxyObjectInfo> hierarchy, List<GalaxyAttributeInfo> attributes)
{
var tagsToUnsubscribe = new List<string>();
var tagsToResubscribe = new List<string>();
lock (Lock)
{
if (_lastHierarchy == null || _lastAttributes == null)
{
Log.Information("No previous state cached — performing full build");
BuildAddressSpace(hierarchy, attributes);
return;
}
var changedIds = AddressSpaceDiff.FindChangedGobjectIds(
_lastHierarchy, _lastAttributes, hierarchy, attributes);
if (changedIds.Count == 0)
{
Log.Information("No address space changes detected");
_lastHierarchy = hierarchy;
_lastAttributes = attributes;
return;
}
// Expand to include child subtrees in both old and new hierarchy
changedIds = AddressSpaceDiff.ExpandToSubtrees(changedIds, _lastHierarchy);
changedIds = AddressSpaceDiff.ExpandToSubtrees(changedIds, hierarchy);
Log.Information("Incremental sync: {Count} gobjects changed out of {Total}",
changedIds.Count, hierarchy.Count);
// Snapshot subscriptions for changed tags before teardown
var affectedSubscriptions = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
foreach (var id in changedIds)
if (_gobjectToTagRefs.TryGetValue(id, out var tagRefs))
foreach (var tagRef in tagRefs)
if (_subscriptionRefCounts.TryGetValue(tagRef, out var count))
affectedSubscriptions[tagRef] = count;
// Tear down changed subtrees (collects tags for deferred unsubscription)
TearDownGobjects(changedIds, tagsToUnsubscribe);
// Rebuild changed subtrees from new data
var changedHierarchy = hierarchy.Where(h => changedIds.Contains(h.GobjectId)).ToList();
var changedAttributes = attributes.Where(a => changedIds.Contains(a.GobjectId)).ToList();
BuildSubtree(changedHierarchy, changedAttributes);
// Restore subscription bookkeeping for surviving tags
foreach (var kvp in affectedSubscriptions)
{
if (!_tagToVariableNode.ContainsKey(kvp.Key))
continue;
_subscriptionRefCounts[kvp.Key] = kvp.Value;
tagsToResubscribe.Add(kvp.Key);
}
_lastHierarchy = new List<GalaxyObjectInfo>(hierarchy);
_lastAttributes = new List<GalaxyAttributeInfo>(attributes);
Log.Information("Incremental sync complete: {Objects} objects, {Variables} variables, {Alarms} alarms",
ObjectNodeCount, VariableNodeCount, _alarmInAlarmTags.Count);
}
// Perform subscribe/unsubscribe I/O outside Lock so read/write/browse operations are not blocked
foreach (var tag in tagsToUnsubscribe)
try { _mxAccessClient.UnsubscribeAsync(tag).GetAwaiter().GetResult(); }
catch (Exception ex) { Log.Warning(ex, "Failed to unsubscribe {Tag} after sync", tag); }
foreach (var tag in tagsToResubscribe)
try { _mxAccessClient.SubscribeAsync(tag, (_, _) => { }).GetAwaiter().GetResult(); }
catch (Exception ex) { Log.Warning(ex, "Failed to restore subscription for {Tag} after sync", tag); }
}
private void TearDownGobjects(HashSet<int> gobjectIds, List<string> tagsToUnsubscribe)
{
foreach (var id in gobjectIds)
{
// Remove variable nodes and their tracking data
if (_gobjectToTagRefs.TryGetValue(id, out var tagRefs))
{
foreach (var tagRef in tagRefs.ToList())
{
// Defer unsubscribe to outside lock
if (_subscriptionRefCounts.ContainsKey(tagRef))
{
tagsToUnsubscribe.Add(tagRef);
_subscriptionRefCounts.Remove(tagRef);
}
// Remove alarm tracking for this tag's InAlarm/Priority/DescAttrName
var alarmKeysToRemove = _alarmInAlarmTags
.Where(kvp => kvp.Value.SourceTagReference == tagRef)
.Select(kvp => kvp.Key)
.ToList();
foreach (var alarmKey in alarmKeysToRemove)
{
var info = _alarmInAlarmTags[alarmKey];
// Defer alarm tag unsubscription to outside lock
foreach (var alarmTag in new[]
{ alarmKey, info.PriorityTagReference, info.DescAttrNameTagReference })
if (!string.IsNullOrEmpty(alarmTag))
tagsToUnsubscribe.Add(alarmTag);
_alarmInAlarmTags.Remove(alarmKey);
if (!string.IsNullOrEmpty(info.PriorityTagReference))
_alarmPriorityTags.Remove(info.PriorityTagReference);
if (!string.IsNullOrEmpty(info.DescAttrNameTagReference))
_alarmDescTags.Remove(info.DescAttrNameTagReference);
if (!string.IsNullOrEmpty(info.AckedTagReference))
_alarmAckedTags.Remove(info.AckedTagReference);
}
// Delete variable node
if (_tagToVariableNode.TryGetValue(tagRef, out var variable))
{
try
{
DeleteNode(SystemContext, variable.NodeId);
}
catch
{
/* ignore */
}
_tagToVariableNode.Remove(tagRef);
}
// Clean up remaining mappings
var nodeIdStr = _nodeIdToTagReference.FirstOrDefault(kvp => kvp.Value == tagRef).Key;
if (nodeIdStr != null)
_nodeIdToTagReference.Remove(nodeIdStr);
_tagMetadata.Remove(tagRef);
VariableNodeCount--;
}
_gobjectToTagRefs.Remove(id);
}
// Delete the object/folder node itself
if (_nodeMap.TryGetValue(id, out var objNode))
{
try
{
DeleteNode(SystemContext, objNode.NodeId);
}
catch
{
/* ignore */
}
_nodeMap.Remove(id);
if (!(objNode is FolderState))
ObjectNodeCount--;
}
}
}
private void BuildSubtree(List<GalaxyObjectInfo> hierarchy, List<GalaxyAttributeInfo> attributes)
{
if (hierarchy.Count == 0)
return;
var sorted = TopologicalSort(hierarchy);
var attrsByObject = attributes
.GroupBy(a => a.GobjectId)
.ToDictionary(g => g.Key, g => g.ToList());
// Find root folder for orphaned nodes
NodeState? rootFolder = null;
if (PredefinedNodes.TryGetValue(new NodeId("ZB", NamespaceIndex), out var rootNode))
rootFolder = rootNode;
foreach (var obj in sorted)
{
NodeState parentNode;
if (_nodeMap.TryGetValue(obj.ParentGobjectId, out var p))
parentNode = p;
else if (rootFolder != null)
parentNode = rootFolder;
else
continue; // no parent available
// Create node with final NodeId before adding to parent
NodeState node;
var nodeId = new NodeId(obj.TagName, NamespaceIndex);
if (obj.IsArea)
{
var folder = new FolderState(parentNode)
{
SymbolicName = obj.BrowseName,
ReferenceTypeId = ReferenceTypes.Organizes,
TypeDefinitionId = ObjectTypeIds.FolderType,
NodeId = nodeId,
BrowseName = new QualifiedName(obj.BrowseName, NamespaceIndex),
DisplayName = new LocalizedText("en", obj.BrowseName),
WriteMask = AttributeWriteMask.None,
UserWriteMask = AttributeWriteMask.None,
EventNotifier = EventNotifiers.None
};
parentNode.AddChild(folder);
node = folder;
}
else
{
var objNode = new BaseObjectState(parentNode)
{
SymbolicName = obj.BrowseName,
ReferenceTypeId = ReferenceTypes.HasComponent,
TypeDefinitionId = ObjectTypeIds.BaseObjectType,
NodeId = nodeId,
BrowseName = new QualifiedName(obj.BrowseName, NamespaceIndex),
DisplayName = new LocalizedText("en", obj.BrowseName),
WriteMask = AttributeWriteMask.None,
UserWriteMask = AttributeWriteMask.None,
EventNotifier = EventNotifiers.None
};
parentNode.AddChild(objNode);
node = objNode;
ObjectNodeCount++;
}
AddPredefinedNode(SystemContext, node);
_nodeMap[obj.GobjectId] = node;
parentNode.ClearChangeMasks(SystemContext, false);
// Create variable nodes (same logic as BuildAddressSpace)
if (attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs))
{
var byPrimitive = objAttrs
.GroupBy(a => a.PrimitiveName ?? "")
.OrderBy(g => g.Key);
var primitiveGroupNames = new HashSet<string>(
byPrimitive.Select(g => g.Key).Where(k => !string.IsNullOrEmpty(k)),
StringComparer.OrdinalIgnoreCase);
var variableNodes = new Dictionary<string, BaseDataVariableState>(StringComparer.OrdinalIgnoreCase);
var directGroup = byPrimitive.FirstOrDefault(g => string.IsNullOrEmpty(g.Key));
if (directGroup != null)
foreach (var attr in directGroup)
{
var variable = CreateAttributeVariable(node, attr);
if (primitiveGroupNames.Contains(attr.AttributeName))
variableNodes[attr.AttributeName] = variable;
}
foreach (var group in byPrimitive)
{
if (string.IsNullOrEmpty(group.Key))
continue;
NodeState parentForAttrs;
if (variableNodes.TryGetValue(group.Key, out var existingVariable))
{
parentForAttrs = existingVariable;
}
else
{
var primNode = CreateObject(node, group.Key, group.Key);
primNode.NodeId = new NodeId(obj.TagName + "." + group.Key, NamespaceIndex);
AddPredefinedNode(SystemContext, primNode);
parentForAttrs = primNode;
}
foreach (var attr in group) CreateAttributeVariable(parentForAttrs, attr);
}
}
}
// Alarm tracking for the new subtree
if (_alarmTrackingEnabled)
{
var includedIds = ResolveAlarmFilterIncludedIds(sorted);
foreach (var obj in sorted)
{
if (obj.IsArea) continue;
if (includedIds != null && !includedIds.Contains(obj.GobjectId)) continue;
if (!attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) continue;
var hasAlarms = false;
var alarmAttrs = objAttrs.Where(a => a.IsAlarm && string.IsNullOrEmpty(a.PrimitiveName)).ToList();
foreach (var alarmAttr in alarmAttrs)
{
var inAlarmTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']') + ".InAlarm";
if (!_tagToVariableNode.ContainsKey(inAlarmTagRef))
continue;
var alarmNodeIdStr = alarmAttr.FullTagReference.EndsWith("[]")
? alarmAttr.FullTagReference.Substring(0, alarmAttr.FullTagReference.Length - 2)
: alarmAttr.FullTagReference;
_tagToVariableNode.TryGetValue(alarmAttr.FullTagReference, out var sourceVariable);
var sourceNodeId = new NodeId(alarmNodeIdStr, NamespaceIndex);
var conditionNodeId = new NodeId(alarmNodeIdStr + ".Condition", NamespaceIndex);
var condition = new AlarmConditionState(sourceVariable);
condition.Create(SystemContext, conditionNodeId,
new QualifiedName(alarmAttr.AttributeName + "Alarm", NamespaceIndex),
new LocalizedText("en", alarmAttr.AttributeName + " Alarm"),
true);
condition.SourceNode.Value = sourceNodeId;
condition.SourceName.Value = alarmAttr.FullTagReference.TrimEnd('[', ']');
condition.ConditionName.Value = alarmAttr.AttributeName;
condition.AutoReportStateChanges = true;
condition.SetEnableState(SystemContext, true);
condition.SetActiveState(SystemContext, false);
condition.SetAcknowledgedState(SystemContext, true);
condition.SetSeverity(SystemContext, EventSeverity.Medium);
condition.Retain.Value = false;
condition.OnReportEvent = (context, n, e) => Server.ReportEvent(context, e);
condition.OnAcknowledge = OnAlarmAcknowledge;
if (sourceVariable != null)
{
sourceVariable.AddReference(ReferenceTypeIds.HasCondition, false, conditionNodeId);
condition.AddReference(ReferenceTypeIds.HasCondition, true, sourceNodeId);
}
AddPredefinedNode(SystemContext, condition);
var baseTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']');
var alarmInfo = new AlarmInfo
{
SourceTagReference = alarmAttr.FullTagReference,
SourceNodeId = sourceNodeId,
SourceName = alarmAttr.AttributeName,
ConditionNode = condition,
PriorityTagReference = baseTagRef + ".Priority",
DescAttrNameTagReference = baseTagRef + ".DescAttrName",
AckedTagReference = baseTagRef + ".Acked",
AckMsgTagReference = baseTagRef + ".AckMsg"
};
_alarmInAlarmTags[inAlarmTagRef] = alarmInfo;
_alarmAckedTags[alarmInfo.AckedTagReference] = alarmInfo;
if (!string.IsNullOrEmpty(alarmInfo.PriorityTagReference))
_alarmPriorityTags[alarmInfo.PriorityTagReference] = alarmInfo;
if (!string.IsNullOrEmpty(alarmInfo.DescAttrNameTagReference))
_alarmDescTags[alarmInfo.DescAttrNameTagReference] = alarmInfo;
hasAlarms = true;
}
if (hasAlarms && _nodeMap.TryGetValue(obj.GobjectId, out var objNode))
EnableEventNotifierUpChain(objNode);
}
// Subscribe alarm tags for new subtree
foreach (var kvp in _alarmInAlarmTags)
{
// Only subscribe tags that belong to the newly built subtree
var gobjectIds = new HashSet<int>(hierarchy.Select(h => h.GobjectId));
var sourceTagRef = kvp.Value.SourceTagReference;
var ownerAttr = attributes.FirstOrDefault(a => a.FullTagReference == sourceTagRef);
if (ownerAttr == null || !gobjectIds.Contains(ownerAttr.GobjectId))
continue;
foreach (var tag in new[]
{ kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference })
{
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
continue;
TrackBackgroundSubscribe(tag, "subtree alarm auto-subscribe");
}
}
}
}
/// <summary>
/// Sorts hierarchy so parents always appear before children, regardless of input order.
/// </summary>
private static List<GalaxyObjectInfo> TopologicalSort(List<GalaxyObjectInfo> hierarchy)
{
var byId = hierarchy.ToDictionary(h => h.GobjectId);
var knownIds = new HashSet<int>(hierarchy.Select(h => h.GobjectId));
var visited = new HashSet<int>();
var result = new List<GalaxyObjectInfo>(hierarchy.Count);
void Visit(GalaxyObjectInfo obj)
{
if (!visited.Add(obj.GobjectId)) return;
// Visit parent first if it exists in the hierarchy
if (knownIds.Contains(obj.ParentGobjectId) && byId.TryGetValue(obj.ParentGobjectId, out var parent))
Visit(parent);
result.Add(obj);
}
foreach (var obj in hierarchy)
Visit(obj);
return result;
}
private BaseDataVariableState CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr)
{
var opcUaDataTypeId = MxDataTypeMapper.MapToOpcUaDataType(attr.MxDataType);
var variable = CreateVariable(parent, attr.AttributeName, attr.AttributeName, new NodeId(opcUaDataTypeId),
attr.IsArray ? ValueRanks.OneDimension : ValueRanks.Scalar);
var nodeIdString = GetNodeIdentifier(attr);
variable.NodeId = new NodeId(nodeIdString, NamespaceIndex);
if (attr.IsArray && attr.ArrayDimension.HasValue)
variable.ArrayDimensions = new ReadOnlyList<uint>(new List<uint> { (uint)attr.ArrayDimension.Value });
var accessLevel = SecurityClassificationMapper.IsWritable(attr.SecurityClassification)
? AccessLevels.CurrentReadOrWrite
: AccessLevels.CurrentRead;
if (attr.IsHistorized) accessLevel |= AccessLevels.HistoryRead;
variable.AccessLevel = accessLevel;
variable.UserAccessLevel = accessLevel;
variable.Historizing = attr.IsHistorized;
if (attr.IsHistorized)
{
var histConfigNodeId = new NodeId(nodeIdString + ".HAConfiguration", NamespaceIndex);
var histConfig = new BaseObjectState(variable)
{
NodeId = histConfigNodeId,
BrowseName = new QualifiedName("HAConfiguration", NamespaceIndex),
DisplayName = "HA Configuration",
TypeDefinitionId = ObjectTypeIds.HistoricalDataConfigurationType
};
var steppedProp = new PropertyState<bool>(histConfig)
{
NodeId = new NodeId(nodeIdString + ".HAConfiguration.Stepped", NamespaceIndex),
BrowseName = BrowseNames.Stepped,
DisplayName = "Stepped",
Value = false,
AccessLevel = AccessLevels.CurrentRead,
UserAccessLevel = AccessLevels.CurrentRead
};
histConfig.AddChild(steppedProp);
var definitionProp = new PropertyState<string>(histConfig)
{
NodeId = new NodeId(nodeIdString + ".HAConfiguration.Definition", NamespaceIndex),
BrowseName = BrowseNames.Definition,
DisplayName = "Definition",
Value = "Wonderware Historian",
AccessLevel = AccessLevels.CurrentRead,
UserAccessLevel = AccessLevels.CurrentRead
};
histConfig.AddChild(definitionProp);
variable.AddChild(histConfig);
AddPredefinedNode(SystemContext, histConfig);
}
variable.Value = NormalizePublishedValue(attr.FullTagReference, null);
variable.StatusCode = StatusCodes.BadWaitingForInitialData;
variable.Timestamp = DateTime.UtcNow;
AddPredefinedNode(SystemContext, variable);
_nodeIdToTagReference[nodeIdString] = attr.FullTagReference;
_tagToVariableNode[attr.FullTagReference] = variable;
_tagMetadata[attr.FullTagReference] = new TagMetadata
{
MxDataType = attr.MxDataType,
IsArray = attr.IsArray,
ArrayDimension = attr.ArrayDimension,
SecurityClassification = attr.SecurityClassification
};
// Track gobject → tag references for incremental sync
if (!_gobjectToTagRefs.TryGetValue(attr.GobjectId, out var tagList))
{
tagList = new List<string>();
_gobjectToTagRefs[attr.GobjectId] = tagList;
}
tagList.Add(attr.FullTagReference);
VariableNodeCount++;
return variable;
}
private static string GetNodeIdentifier(GalaxyAttributeInfo attr)
{
if (!attr.IsArray)
return attr.FullTagReference;
return attr.FullTagReference.EndsWith("[]", StringComparison.Ordinal)
? attr.FullTagReference.Substring(0, attr.FullTagReference.Length - 2)
: attr.FullTagReference;
}
private FolderState CreateFolder(NodeState? parent, string path, string name)
{
var folder = new FolderState(parent)
{
SymbolicName = name,
ReferenceTypeId = ReferenceTypes.Organizes,
TypeDefinitionId = ObjectTypeIds.FolderType,
NodeId = new NodeId(path, NamespaceIndex),
BrowseName = new QualifiedName(name, NamespaceIndex),
DisplayName = new LocalizedText("en", name),
WriteMask = AttributeWriteMask.None,
UserWriteMask = AttributeWriteMask.None,
EventNotifier = EventNotifiers.None
};
parent?.AddChild(folder);
return folder;
}
private BaseObjectState CreateObject(NodeState parent, string path, string name)
{
var obj = new BaseObjectState(parent)
{
SymbolicName = name,
ReferenceTypeId = ReferenceTypes.HasComponent,
TypeDefinitionId = ObjectTypeIds.BaseObjectType,
NodeId = new NodeId(path, NamespaceIndex),
BrowseName = new QualifiedName(name, NamespaceIndex),
DisplayName = new LocalizedText("en", name),
WriteMask = AttributeWriteMask.None,
UserWriteMask = AttributeWriteMask.None,
EventNotifier = EventNotifiers.None
};
parent.AddChild(obj);
return obj;
}
private BaseDataVariableState CreateVariable(NodeState parent, string path, string name, NodeId dataType,
int valueRank)
{
var variable = new BaseDataVariableState(parent)
{
SymbolicName = name,
ReferenceTypeId = ReferenceTypes.HasComponent,
TypeDefinitionId = VariableTypeIds.BaseDataVariableType,
NodeId = new NodeId(path, NamespaceIndex),
BrowseName = new QualifiedName(name, NamespaceIndex),
DisplayName = new LocalizedText("en", name),
WriteMask = AttributeWriteMask.None,
UserWriteMask = AttributeWriteMask.None,
DataType = dataType,
ValueRank = valueRank,
AccessLevel = AccessLevels.CurrentReadOrWrite,
UserAccessLevel = AccessLevels.CurrentReadOrWrite,
Historizing = false,
StatusCode = StatusCodes.Good,
Timestamp = DateTime.UtcNow
};
parent.AddChild(variable);
return variable;
}
#region Condition Refresh
/// <inheritdoc />
/// <param name="context">The OPC UA request context for the condition refresh operation.</param>
/// <param name="monitoredItems">The monitored event items that should receive retained alarm conditions.</param>
public override ServiceResult ConditionRefresh(OperationContext context,
IList<IEventMonitoredItem> monitoredItems)
{
foreach (var kvp in _alarmInAlarmTags)
{
var info = kvp.Value;
if (info.ConditionNode == null || info.ConditionNode.Retain?.Value != true)
continue;
foreach (var item in monitoredItems) item.QueueEvent(info.ConditionNode);
}
return ServiceResult.Good;
}
#endregion
private sealed class TagMetadata
{
/// <summary>
/// Gets or sets the MXAccess data type code used to map Galaxy values into OPC UA variants.
/// </summary>
public int MxDataType { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the source Galaxy attribute should be exposed as an array node.
/// </summary>
public bool IsArray { get; set; }
/// <summary>
/// Gets or sets the declared array length from Galaxy metadata when the attribute is modeled as an array.
/// </summary>
public int? ArrayDimension { get; set; }
/// <summary>
/// Gets or sets the Galaxy security classification (0=FreeAccess, 1=Operate, 4=Tune, 5=Configure, etc.).
/// Used at write time to determine which write role is required.
/// </summary>
public int SecurityClassification { get; set; }
}
private sealed class AlarmInfo
{
/// <summary>
/// Gets or sets the full tag reference for the process value whose alarm state is tracked.
/// </summary>
public string SourceTagReference { get; set; } = "";
/// <summary>
/// Gets or sets the OPC UA node identifier for the source variable that owns the alarm condition.
/// </summary>
public NodeId SourceNodeId { get; set; } = NodeId.Null;
/// <summary>
/// Gets or sets the operator-facing source name used in generated alarm events.
/// </summary>
public string SourceName { get; set; } = "";
/// <summary>
/// Gets or sets the most recent in-alarm state so duplicate transitions are not reissued.
/// </summary>
public bool LastInAlarm { get; set; }
/// <summary>
/// Gets or sets the retained OPC UA condition node associated with the source alarm.
/// </summary>
public AlarmConditionState? ConditionNode { get; set; }
/// <summary>
/// Gets or sets the Galaxy tag reference that supplies runtime alarm priority updates.
/// </summary>
public string PriorityTagReference { get; set; } = "";
/// <summary>
/// Gets or sets the Galaxy tag reference or attribute binding used to resolve the alarm message text.
/// </summary>
public string DescAttrNameTagReference { get; set; } = "";
/// <summary>
/// Gets or sets the cached OPC UA severity derived from the latest alarm priority value.
/// </summary>
public ushort CachedSeverity { get; set; }
/// <summary>
/// Gets or sets the cached alarm message used when emitting active and cleared events.
/// </summary>
public string CachedMessage { get; set; } = "";
/// <summary>
/// Gets or sets the Galaxy tag reference for the alarm acknowledged state.
/// </summary>
public string AckedTagReference { get; set; } = "";
/// <summary>
/// Gets or sets the Galaxy tag reference for the acknowledge message that triggers acknowledgment.
/// </summary>
public string AckMsgTagReference { get; set; } = "";
/// <summary>
/// Gets or sets the most recent acknowledged state so duplicate transitions are not reissued.
/// </summary>
public bool? LastAcked { get; set; }
}
#region Read/Write Handlers
/// <inheritdoc />
public override void Read(OperationContext context, double maxAge, IList<ReadValueId> nodesToRead,
IList<DataValue> results, IList<ServiceResult> errors)
{
base.Read(context, maxAge, nodesToRead, results, errors);
for (var i = 0; i < nodesToRead.Count; i++)
{
if (nodesToRead[i].AttributeId != Attributes.Value)
continue;
var nodeId = nodesToRead[i].NodeId;
if (nodeId.NamespaceIndex != NamespaceIndex) continue;
var nodeIdStr = nodeId.Identifier as string;
if (nodeIdStr == null) continue;
if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
// Short-circuit when the owning galaxy runtime host is currently Stopped:
// return the last cached value with BadOutOfService so the operator sees a
// uniform dead-host signal instead of MxAccess silently serving stale data.
// This covers both direct Read requests and OPC UA monitored-item sampling,
// which also flow through this override.
if (IsTagUnderStoppedHost(tagRef))
{
_tagToVariableNode.TryGetValue(tagRef, out var cachedVar);
results[i] = new DataValue
{
Value = cachedVar?.Value,
StatusCode = StatusCodes.BadOutOfService,
SourceTimestamp = cachedVar?.Timestamp ?? DateTime.UtcNow,
ServerTimestamp = DateTime.UtcNow
};
errors[i] = ServiceResult.Good;
continue;
}
try
{
var vtq = SyncOverAsync.WaitSync(
_mxAccessClient.ReadAsync(tagRef),
_mxAccessRequestTimeout,
"MxAccessClient.ReadAsync");
results[i] = CreatePublishedDataValue(tagRef, vtq);
errors[i] = ServiceResult.Good;
}
catch (TimeoutException ex)
{
Log.Warning(ex, "Read timed out for {TagRef}", tagRef);
errors[i] = new ServiceResult(StatusCodes.BadTimeout);
}
catch (Exception ex)
{
Log.Warning(ex, "Read failed for {TagRef}", tagRef);
errors[i] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
}
private bool IsTagUnderStoppedHost(string tagRef)
{
if (_galaxyRuntimeProbeManager == null)
return false;
if (!_hostIdsByTagRef.TryGetValue(tagRef, out var hostIds))
return false;
for (var i = 0; i < hostIds.Count; i++)
if (_galaxyRuntimeProbeManager.IsHostStopped(hostIds[i]))
return true;
return false;
}
/// <inheritdoc />
public override void Write(OperationContext context, IList<WriteValue> nodesToWrite,
IList<ServiceResult> errors)
{
base.Write(context, nodesToWrite, errors);
for (var i = 0; i < nodesToWrite.Count; i++)
{
if (nodesToWrite[i].AttributeId != Attributes.Value)
continue;
// Skip if base rejected due to access level (read-only node)
if (errors[i] != null && errors[i].StatusCode == StatusCodes.BadNotWritable)
continue;
var nodeId = nodesToWrite[i].NodeId;
if (nodeId.NamespaceIndex != NamespaceIndex) continue;
var nodeIdStr = nodeId.Identifier as string;
if (nodeIdStr == null) continue;
if (!_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
continue;
// Check write permission based on the node's security classification
var secClass = _tagMetadata.TryGetValue(tagRef, out var meta) ? meta.SecurityClassification : 1;
if (!HasWritePermission(context, secClass))
{
errors[i] = new ServiceResult(StatusCodes.BadUserAccessDenied);
continue;
}
{
try
{
var writeValue = nodesToWrite[i];
var value = writeValue.Value.WrappedValue.Value;
if (!string.IsNullOrWhiteSpace(writeValue.IndexRange))
{
if (!TryApplyArrayElementWrite(tagRef, value, writeValue.IndexRange, out var updatedArray))
{
errors[i] = new ServiceResult(StatusCodes.BadIndexRangeInvalid);
continue;
}
value = updatedArray;
}
var success = SyncOverAsync.WaitSync(
_mxAccessClient.WriteAsync(tagRef, value),
_mxAccessRequestTimeout,
"MxAccessClient.WriteAsync");
if (success)
{
PublishLocalWrite(tagRef, value);
errors[i] = ServiceResult.Good;
}
else
{
errors[i] = new ServiceResult(StatusCodes.BadInternalError);
}
}
catch (TimeoutException ex)
{
Log.Warning(ex, "Write timed out for {TagRef}", tagRef);
errors[i] = new ServiceResult(StatusCodes.BadTimeout);
}
catch (Exception ex)
{
Log.Warning(ex, "Write failed for {TagRef}", tagRef);
errors[i] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
}
private bool HasWritePermission(OperationContext context, int securityClassification)
{
var identity = context.UserIdentity;
// Check anonymous sessions against AnonymousCanWrite
if (identity?.GrantedRoleIds?.Contains(ObjectIds.WellKnownRole_Anonymous) == true)
return _anonymousCanWrite;
// When role-based auth is active, require the role matching the security classification
var requiredRoleId = GetRequiredWriteRole(securityClassification);
if (requiredRoleId != null)
return HasGrantedRole(identity, requiredRoleId);
// No role-based auth — authenticated users can write
return true;
}
private NodeId? GetRequiredWriteRole(int securityClassification)
{
switch (securityClassification)
{
case 0: // FreeAccess
case 1: // Operate
return _writeOperateRoleId;
case 4: // Tune
return _writeTuneRoleId;
case 5: // Configure
return _writeConfigureRoleId;
default:
// SecuredWrite (2), VerifiedWrite (3), ViewOnly (6) are read-only by AccessLevel
// but if somehow reached, require the most restrictive role
return _writeConfigureRoleId;
}
}
private bool HasAlarmAckPermission(ISystemContext context)
{
if (_alarmAckRoleId == null)
return true;
var identity = (context as SystemContext)?.UserIdentity;
return HasGrantedRole(identity, _alarmAckRoleId);
}
private static bool HasGrantedRole(IUserIdentity? identity, NodeId? roleId)
{
return roleId != null &&
identity?.GrantedRoleIds != null &&
identity.GrantedRoleIds.Contains(roleId);
}
private static void EnableEventNotifierUpChain(NodeState node)
{
for (var current = node as BaseInstanceState;
current != null;
current = current.Parent as BaseInstanceState)
if (current is BaseObjectState obj)
obj.EventNotifier = EventNotifiers.SubscribeToEvents;
else if (current is FolderState folder)
folder.EventNotifier = EventNotifiers.SubscribeToEvents;
}
private void ReportEventUpNotifierChain(BaseInstanceState sourceNode, IFilterTarget eventInstance)
{
for (var current = sourceNode.Parent; current != null; current = (current as BaseInstanceState)?.Parent)
current.ReportEvent(SystemContext, eventInstance);
}
private bool TryApplyArrayElementWrite(string tagRef, object? writeValue, string indexRange,
out object updatedArray)
{
updatedArray = null!;
if (!int.TryParse(indexRange, out var index) || index < 0)
return false;
var currentValue =
NormalizePublishedValue(tagRef, _mxAccessClient.ReadAsync(tagRef).GetAwaiter().GetResult().Value);
if (currentValue is not Array currentArray || currentArray.Rank != 1 || index >= currentArray.Length)
return false;
var nextArray = (Array)currentArray.Clone();
var elementType = currentArray.GetType().GetElementType();
if (elementType == null)
return false;
var normalizedValue = NormalizeIndexedWriteValue(writeValue);
nextArray.SetValue(ConvertArrayElementValue(normalizedValue, elementType), index);
updatedArray = nextArray;
return true;
}
private static object? NormalizeIndexedWriteValue(object? value)
{
if (value is Array array && array.Length == 1)
return array.GetValue(0);
return value;
}
private static object? ConvertArrayElementValue(object? value, Type elementType)
{
if (value == null)
{
if (elementType.IsValueType)
return Activator.CreateInstance(elementType);
return null;
}
if (elementType.IsInstanceOfType(value))
return value;
if (elementType == typeof(string))
return value.ToString();
return Convert.ChangeType(value, elementType);
}
private void PublishLocalWrite(string tagRef, object? value)
{
if (!_tagToVariableNode.TryGetValue(tagRef, out var variable))
return;
var dataValue = CreatePublishedDataValue(tagRef, Vtq.Good(value));
variable.Value = dataValue.Value;
variable.StatusCode = dataValue.StatusCode;
variable.Timestamp = dataValue.SourceTimestamp;
variable.ClearChangeMasks(SystemContext, false);
}
private DataValue CreatePublishedDataValue(string tagRef, Vtq vtq)
{
var normalizedValue = NormalizePublishedValue(tagRef, vtq.Value);
if (ReferenceEquals(normalizedValue, vtq.Value))
return DataValueConverter.FromVtq(vtq);
return DataValueConverter.FromVtq(new Vtq(normalizedValue, vtq.Timestamp, vtq.Quality));
}
private object? NormalizePublishedValue(string tagRef, object? value)
{
if (value != null)
return value;
if (!_tagMetadata.TryGetValue(tagRef, out var metadata) || !metadata.IsArray ||
!metadata.ArrayDimension.HasValue)
return null;
return CreateDefaultArrayValue(metadata);
}
private static Array CreateDefaultArrayValue(TagMetadata metadata)
{
var elementType = MxDataTypeMapper.MapToClrType(metadata.MxDataType);
var values = Array.CreateInstance(elementType, metadata.ArrayDimension!.Value);
if (elementType == typeof(string))
for (var i = 0; i < values.Length; i++)
values.SetValue(string.Empty, i);
return values;
}
#endregion
#region HistoryRead
/// <inheritdoc />
protected override void HistoryReadRawModified(
ServerSystemContext context,
ReadRawModifiedDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
// Handle continuation point resumption
if (nodesToRead[idx].ContinuationPoint != null && nodesToRead[idx].ContinuationPoint.Length > 0)
{
var remaining = _historyContinuations.Retrieve(nodesToRead[idx].ContinuationPoint);
if (remaining == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadContinuationPointInvalid);
continue;
}
ReturnHistoryPage(remaining, details.NumValuesPerNode, results, errors, idx);
continue;
}
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown);
continue;
}
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
if (details.IsReadModified)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
using var historyScope = _metrics.BeginOperation("HistoryReadRaw");
try
{
var maxValues = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
var dataValues = SyncOverAsync.WaitSync(
_historianDataSource.ReadRawAsync(
tagRef, details.StartTime, details.EndTime, maxValues),
_historianRequestTimeout,
"HistorianDataSource.ReadRawAsync");
if (details.ReturnBounds)
AddBoundingValues(dataValues, details.StartTime, details.EndTime);
ReturnHistoryPage(dataValues, details.NumValuesPerNode, results, errors, idx);
}
catch (TimeoutException ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead raw timed out for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
}
catch (Exception ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead raw failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
/// <inheritdoc />
protected override void HistoryReadProcessed(
ServerSystemContext context,
ReadProcessedDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
// Handle continuation point resumption
if (nodesToRead[idx].ContinuationPoint != null && nodesToRead[idx].ContinuationPoint.Length > 0)
{
var remaining = _historyContinuations.Retrieve(nodesToRead[idx].ContinuationPoint);
if (remaining == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadContinuationPointInvalid);
continue;
}
ReturnHistoryPage(remaining, 0, results, errors, idx);
continue;
}
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown);
continue;
}
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
if (details.AggregateType == null || details.AggregateType.Count == 0)
{
errors[idx] = new ServiceResult(StatusCodes.BadAggregateListMismatch);
continue;
}
var aggregateId = details.AggregateType[idx < details.AggregateType.Count ? idx : 0];
var column = HistorianAggregateMap.MapAggregateToColumn(aggregateId);
if (column == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadAggregateNotSupported);
continue;
}
using var historyScope = _metrics.BeginOperation("HistoryReadProcessed");
try
{
var dataValues = SyncOverAsync.WaitSync(
_historianDataSource.ReadAggregateAsync(
tagRef, details.StartTime, details.EndTime,
details.ProcessingInterval, column),
_historianRequestTimeout,
"HistorianDataSource.ReadAggregateAsync");
ReturnHistoryPage(dataValues, 0, results, errors, idx);
}
catch (TimeoutException ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead processed timed out for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
}
catch (Exception ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead processed failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
/// <inheritdoc />
protected override void HistoryReadAtTime(
ServerSystemContext context,
ReadAtTimeDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown);
continue;
}
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
if (details.ReqTimes == null || details.ReqTimes.Count == 0)
{
errors[idx] = new ServiceResult(StatusCodes.BadInvalidArgument);
continue;
}
using var historyScope = _metrics.BeginOperation("HistoryReadAtTime");
try
{
var timestamps = new DateTime[details.ReqTimes.Count];
for (var i = 0; i < details.ReqTimes.Count; i++)
timestamps[i] = details.ReqTimes[i];
var dataValues = SyncOverAsync.WaitSync(
_historianDataSource.ReadAtTimeAsync(tagRef, timestamps),
_historianRequestTimeout,
"HistorianDataSource.ReadAtTimeAsync");
var historyData = new HistoryData();
historyData.DataValues.AddRange(dataValues);
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyData)
};
errors[idx] = ServiceResult.Good;
}
catch (TimeoutException ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead at-time timed out for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
}
catch (Exception ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead at-time failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
/// <inheritdoc />
protected override void HistoryReadEvents(
ServerSystemContext context,
ReadEventDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
var nodeIdStr = handle.NodeId?.Identifier as string;
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
// Resolve the source name for event filtering.
// Alarm condition nodes end with ".Condition" — strip to get the source tag.
// Area/object nodes filter by Source_Name matching the browse name.
string? sourceName = null;
if (nodeIdStr != null)
{
if (nodeIdStr.EndsWith(".Condition"))
{
var baseTag = nodeIdStr.Substring(0, nodeIdStr.Length - ".Condition".Length);
sourceName = baseTag;
}
else if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
sourceName = tagRef;
}
}
using var historyScope = _metrics.BeginOperation("HistoryReadEvents");
try
{
var maxEvents = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
var events = SyncOverAsync.WaitSync(
_historianDataSource.ReadEventsAsync(
sourceName, details.StartTime, details.EndTime, maxEvents),
_historianRequestTimeout,
"HistorianDataSource.ReadEventsAsync");
var historyEvent = new HistoryEvent();
foreach (var evt in events)
{
// Build the standard event field list per OPC UA Part 11
// Fields: EventId, EventType, SourceNode, SourceName, Time, ReceiveTime,
// Message, Severity
var fields = new HistoryEventFieldList();
fields.EventFields.Add(new Variant(evt.Id.ToByteArray()));
fields.EventFields.Add(new Variant(ObjectTypeIds.AlarmConditionType));
fields.EventFields.Add(new Variant(
nodeIdStr != null ? new NodeId(nodeIdStr, NamespaceIndex) : NodeId.Null));
fields.EventFields.Add(new Variant(evt.Source ?? ""));
fields.EventFields.Add(new Variant(
DateTime.SpecifyKind(evt.EventTime, DateTimeKind.Utc)));
fields.EventFields.Add(new Variant(
DateTime.SpecifyKind(evt.ReceivedTime, DateTimeKind.Utc)));
fields.EventFields.Add(new Variant(new LocalizedText(evt.DisplayText ?? "")));
fields.EventFields.Add(new Variant((ushort)evt.Severity));
historyEvent.Events.Add(fields);
}
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyEvent)
};
errors[idx] = ServiceResult.Good;
}
catch (TimeoutException ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead events timed out for {NodeId}", nodeIdStr);
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
}
catch (Exception ex)
{
historyScope.SetSuccess(false);
Log.Warning(ex, "HistoryRead events failed for {NodeId}", nodeIdStr);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
private void ReturnHistoryPage(List<DataValue> dataValues, uint numValuesPerNode,
IList<HistoryReadResult> results, IList<ServiceResult> errors, int idx)
{
var pageSize = numValuesPerNode > 0 ? (int)numValuesPerNode : dataValues.Count;
var historyData = new HistoryData();
byte[]? continuationPoint = null;
if (dataValues.Count > pageSize)
{
historyData.DataValues.AddRange(dataValues.GetRange(0, pageSize));
var remainder = dataValues.GetRange(pageSize, dataValues.Count - pageSize);
continuationPoint = _historyContinuations.Store(remainder);
}
else
{
historyData.DataValues.AddRange(dataValues);
}
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyData),
ContinuationPoint = continuationPoint
};
errors[idx] = ServiceResult.Good;
}
private static void AddBoundingValues(List<DataValue> dataValues, DateTime startTime, DateTime endTime)
{
// Insert start bound if first sample doesn't match start time
if (dataValues.Count == 0 || dataValues[0].SourceTimestamp != startTime)
{
dataValues.Insert(0, new DataValue
{
Value = Variant.Null,
SourceTimestamp = startTime,
ServerTimestamp = startTime,
StatusCode = StatusCodes.BadBoundNotFound
});
}
// Append end bound if last sample doesn't match end time
if (dataValues.Count == 0 || dataValues[dataValues.Count - 1].SourceTimestamp != endTime)
{
dataValues.Add(new DataValue
{
Value = Variant.Null,
SourceTimestamp = endTime,
ServerTimestamp = endTime,
StatusCode = StatusCodes.BadBoundNotFound
});
}
}
#endregion
#region Subscription Delivery
/// <summary>
/// Called by the OPC UA framework during monitored item creation.
/// Triggers ref-counted MXAccess subscriptions early so the runtime value
/// can arrive before the initial publish to the client.
/// </summary>
/// <inheritdoc />
protected override void OnMonitoredItemCreated(ServerSystemContext context, NodeHandle handle,
MonitoredItem monitoredItem)
{
base.OnMonitoredItemCreated(context, handle, monitoredItem);
var nodeIdStr = handle?.NodeId?.Identifier as string;
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
SubscribeTag(tagRef);
}
/// <summary>
/// Called by the OPC UA framework after monitored items are deleted.
/// Decrements ref-counted MXAccess subscriptions.
/// </summary>
/// <inheritdoc />
protected override void OnDeleteMonitoredItemsComplete(ServerSystemContext context,
IList<IMonitoredItem> monitoredItems)
{
foreach (var item in monitoredItems)
{
var nodeIdStr = GetNodeIdString(item);
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
UnsubscribeTag(tagRef);
}
}
/// <summary>
/// Called by the OPC UA framework after monitored items are transferred to a new session.
/// Rebuilds MXAccess subscription bookkeeping when transferred items arrive without local in-memory state.
/// </summary>
/// <inheritdoc />
protected override void OnMonitoredItemsTransferred(ServerSystemContext context,
IList<IMonitoredItem> monitoredItems)
{
base.OnMonitoredItemsTransferred(context, monitoredItems);
var transferredTagRefs = monitoredItems
.Select(GetNodeIdString)
.Where(nodeIdStr => nodeIdStr != null && _nodeIdToTagReference.ContainsKey(nodeIdStr))
.Select(nodeIdStr => _nodeIdToTagReference[nodeIdStr!])
.ToList();
RestoreTransferredSubscriptions(transferredTagRefs);
}
/// <inheritdoc />
protected override void OnModifyMonitoredItemsComplete(ServerSystemContext context,
IList<IMonitoredItem> monitoredItems)
{
foreach (var item in monitoredItems)
Log.Debug("MonitoredItem modified: Id={Id}, SamplingInterval={Interval}ms",
item.Id, item.SamplingInterval);
}
private static string? GetNodeIdString(IMonitoredItem item)
{
if (item.ManagerHandle is NodeState node)
return node.NodeId?.Identifier as string;
return null;
}
/// <summary>
/// Increments the subscription reference count for a Galaxy tag and opens the runtime subscription when the first OPC
/// UA monitored item appears.
/// </summary>
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to subscribe.</param>
internal void SubscribeTag(string fullTagReference)
{
var shouldSubscribe = false;
lock (Lock)
{
if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count))
{
_subscriptionRefCounts[fullTagReference] = count + 1;
}
else
{
_subscriptionRefCounts[fullTagReference] = 1;
shouldSubscribe = true;
}
}
if (shouldSubscribe)
{
try
{
_mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { }).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to subscribe tag {Tag}", fullTagReference);
}
}
}
/// <summary>
/// Decrements the subscription reference count for a Galaxy tag and closes the runtime subscription when no OPC UA
/// monitored items remain.
/// </summary>
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to unsubscribe.</param>
internal void UnsubscribeTag(string fullTagReference)
{
var shouldUnsubscribe = false;
lock (Lock)
{
if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count))
{
if (count <= 1)
{
_subscriptionRefCounts.Remove(fullTagReference);
shouldUnsubscribe = true;
}
else
{
_subscriptionRefCounts[fullTagReference] = count - 1;
}
}
}
if (shouldUnsubscribe)
{
try
{
_mxAccessClient.UnsubscribeAsync(fullTagReference).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to unsubscribe tag {Tag}", fullTagReference);
}
}
}
/// <summary>
/// Rebuilds subscription reference counts for monitored items that were transferred by the OPC UA stack.
/// Existing in-memory bookkeeping is preserved to avoid double-counting normal in-process transfers.
/// </summary>
/// <param name="fullTagReferences">The Galaxy tag references represented by the transferred monitored items.</param>
internal void RestoreTransferredSubscriptions(IEnumerable<string> fullTagReferences)
{
var transferredCounts = fullTagReferences
.GroupBy(tagRef => tagRef, StringComparer.OrdinalIgnoreCase)
.ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase);
var tagsToSubscribe = new List<string>();
foreach (var kvp in transferredCounts)
lock (Lock)
{
if (_subscriptionRefCounts.ContainsKey(kvp.Key))
continue;
_subscriptionRefCounts[kvp.Key] = kvp.Value;
tagsToSubscribe.Add(kvp.Key);
}
foreach (var tagRef in tagsToSubscribe)
TrackBackgroundSubscribe(tagRef, "transferred subscription restore");
}
private void OnMxAccessDataChange(string address, Vtq vtq)
{
if (_dispatchDisposed)
return;
// Runtime status probes are bridge-owned subscriptions whose only job is to drive the
// host state machine; they are NOT in _tagToVariableNode, so the normal dispatch path
// would drop them anyway. Route probe addresses directly to the probe manager and skip
// the dispatch queue entirely.
if (_galaxyRuntimeProbeManager != null
&& _galaxyRuntimeProbeManager.HandleProbeUpdate(address, vtq))
return;
Interlocked.Increment(ref _totalMxChangeEvents);
_pendingDataChanges[address] = vtq;
try
{
_dataChangeSignal.Set();
}
catch (ObjectDisposedException)
{
// Shutdown may race with one final callback from the runtime.
}
}
#endregion
#region Data Change Dispatch
private void StartDispatchThread()
{
_dispatchRunning = true;
_dispatchThread = new Thread(DispatchLoop)
{
Name = "OpcUaDataChangeDispatch",
IsBackground = true
};
_dispatchThread.Start();
}
private void StopDispatchThread()
{
_dispatchRunning = false;
_dataChangeSignal.Set();
_dispatchThread?.Join(TimeSpan.FromSeconds(5));
}
private void DispatchLoop()
{
Log.Information("Data change dispatch thread started");
while (_dispatchRunning)
try
{
_dataChangeSignal.WaitOne(TimeSpan.FromMilliseconds(100));
if (!_dispatchRunning)
break;
// Drive time-based probe state transitions on every dispatch tick. The dispatch
// loop already wakes every 100ms via the WaitOne timeout, so this gives us a
// ~10Hz cadence for the Unknown → Stopped timeout without introducing a new
// thread or timer. No-op when the probe manager is disabled.
_galaxyRuntimeProbeManager?.Tick();
// Drain any host-state transitions queued from the STA probe callback. Each
// Mark/Clear call takes its own node manager Lock, which is safe here because
// the dispatch thread is not currently holding it.
while (_pendingHostStateChanges.TryDequeue(out var transition))
{
if (transition.Stopped)
MarkHostVariablesBadQuality(transition.GobjectId);
else
ClearHostVariablesBadQuality(transition.GobjectId);
// Also refresh the synthetic $RuntimeState child nodes on this host so
// subscribed OPC UA clients see the state change in the same publish cycle.
UpdateHostRuntimeStatusNodes(transition.GobjectId);
}
var keys = _pendingDataChanges.Keys.ToList();
if (keys.Count == 0)
{
ReportDispatchMetricsIfDue();
continue;
}
// Prepare updates outside the Lock. Shared-state lookups stay inside the Lock.
var updates =
new List<(string address, BaseDataVariableState variable, DataValue dataValue)>(keys.Count);
var pendingAlarmEvents =
new List<(string address, AlarmInfo info, bool active, ushort? severity, string? message)>();
var pendingAckedEvents = new List<(AlarmInfo info, bool acked)>();
foreach (var address in keys)
{
if (!_pendingDataChanges.TryRemove(address, out var vtq))
continue;
// Suppress updates for tags whose owning Galaxy runtime host is currently
// Stopped. Without this, MxAccess keeps streaming cached values that would
// overwrite the BadOutOfService set by MarkHostVariablesBadQuality — the
// variables would flicker Bad→Good every dispatch cycle and subscribers
// would see a flood of notifications (the original "client freeze" symptom).
// Dropping at the source also means we do no lock/alarm work for dead data.
if (IsTagUnderStoppedHost(address))
{
Interlocked.Increment(ref _suppressedUpdatesCount);
continue;
}
AlarmInfo? alarmInfo = null;
AlarmInfo? ackedAlarmInfo = null;
var newInAlarm = false;
var newAcked = false;
lock (Lock)
{
if (_tagToVariableNode.TryGetValue(address, out var variable))
try
{
var dataValue = CreatePublishedDataValue(address, vtq);
updates.Add((address, variable, dataValue));
}
catch (Exception ex)
{
Log.Warning(ex, "Error preparing data change for {Address}", address);
}
if (_alarmInAlarmTags.TryGetValue(address, out alarmInfo))
{
newInAlarm = vtq.Value is true || vtq.Value is 1 ||
(vtq.Value is int intVal && intVal != 0);
if (newInAlarm == alarmInfo.LastInAlarm)
alarmInfo = null;
}
// Cache alarm priority/description values as they arrive via subscription
if (_alarmPriorityTags.TryGetValue(address, out var priorityInfo))
{
if (vtq.Value is int ipCache)
priorityInfo.CachedSeverity =
(ushort)Math.Min(Math.Max(ipCache, 1), 1000);
else if (vtq.Value is short spCache)
priorityInfo.CachedSeverity =
(ushort)Math.Min(Math.Max((int)spCache, 1), 1000);
}
if (_alarmDescTags.TryGetValue(address, out var descInfo))
{
if (vtq.Value is string descCache && !string.IsNullOrEmpty(descCache))
descInfo.CachedMessage = descCache;
}
// Check for Acked transitions — skip if state hasn't changed
if (_alarmAckedTags.TryGetValue(address, out ackedAlarmInfo))
{
newAcked = vtq.Value is true || vtq.Value is 1 ||
(vtq.Value is int ackedIntVal && ackedIntVal != 0);
if (ackedAlarmInfo.LastAcked.HasValue && newAcked == ackedAlarmInfo.LastAcked.Value)
ackedAlarmInfo = null; // No transition → skip
else
{
pendingAckedEvents.Add((ackedAlarmInfo, newAcked));
Interlocked.Increment(ref _alarmAckEventCount);
}
}
}
if (alarmInfo == null)
continue;
ushort? severity = null;
string? message = null;
if (newInAlarm)
{
// Use cached values from subscription data changes instead of blocking reads
severity = alarmInfo.CachedSeverity > 0 ? alarmInfo.CachedSeverity : (ushort?)null;
message = !string.IsNullOrEmpty(alarmInfo.CachedMessage)
? alarmInfo.CachedMessage
: null;
}
pendingAlarmEvents.Add((address, alarmInfo, newInAlarm, severity, message));
Interlocked.Increment(ref _alarmTransitionCount);
}
// Apply under Lock so ClearChangeMasks propagates to monitored items.
if (updates.Count > 0 || pendingAlarmEvents.Count > 0 || pendingAckedEvents.Count > 0)
lock (Lock)
{
foreach (var (address, variable, dataValue) in updates)
{
if (!_tagToVariableNode.TryGetValue(address, out var currentVariable) ||
!ReferenceEquals(currentVariable, variable))
continue;
variable.Value = dataValue.Value;
variable.StatusCode = dataValue.StatusCode;
variable.Timestamp = dataValue.SourceTimestamp;
variable.ClearChangeMasks(SystemContext, false);
}
foreach (var (address, info, active, severity, message) in pendingAlarmEvents)
{
if (!_alarmInAlarmTags.TryGetValue(address, out var currentInfo) ||
!ReferenceEquals(currentInfo, info))
continue;
if (currentInfo.LastInAlarm == active)
continue;
currentInfo.LastInAlarm = active;
if (severity.HasValue)
currentInfo.CachedSeverity = severity.Value;
if (!string.IsNullOrEmpty(message))
currentInfo.CachedMessage = message!;
try
{
ReportAlarmEvent(currentInfo, active);
}
catch (Exception ex)
{
Log.Warning(ex, "Error reporting alarm event for {Source}", currentInfo.SourceName);
}
}
// Apply Acked state changes
foreach (var (info, acked) in pendingAckedEvents)
{
// Double-check dedup under lock
if (info.LastAcked.HasValue && acked == info.LastAcked.Value)
continue;
info.LastAcked = acked;
var condition = info.ConditionNode;
if (condition == null) continue;
try
{
condition.SetAcknowledgedState(SystemContext, acked);
condition.Retain.Value = condition.ActiveState?.Id?.Value == true || !acked;
if (_tagToVariableNode.TryGetValue(info.SourceTagReference, out var src))
ReportEventUpNotifierChain(src, condition);
Log.Information("Alarm {AckState}: {Source}",
acked ? "ACKNOWLEDGED" : "UNACKNOWLEDGED", info.SourceName);
}
catch (Exception ex)
{
Log.Warning(ex, "Error updating acked state for {Source}", info.SourceName);
}
}
}
Interlocked.Add(ref _totalDispatchBatchSize, updates.Count);
Interlocked.Increment(ref _dispatchCycleCount);
ReportDispatchMetricsIfDue();
}
catch (Exception ex)
{
Log.Error(ex, "Unhandled error in data change dispatch loop");
}
Log.Information("Data change dispatch thread stopped");
}
private void ReportDispatchMetricsIfDue()
{
var now = DateTime.UtcNow;
var elapsed = (now - _lastMetricsReportTime).TotalSeconds;
if (elapsed < 60) return;
var totalEvents = Interlocked.Read(ref _totalMxChangeEvents);
var lastReported = Interlocked.Read(ref _lastReportedMxChangeEvents);
var eventsPerSecond = (totalEvents - lastReported) / elapsed;
Interlocked.Exchange(ref _lastReportedMxChangeEvents, totalEvents);
var batchSize = Interlocked.Read(ref _totalDispatchBatchSize);
var cycles = Interlocked.Read(ref _dispatchCycleCount);
var avgQueueSize = cycles > 0 ? (double)batchSize / cycles : 0;
var suppressed = Interlocked.Exchange(ref _suppressedUpdatesCount, 0);
// Reset rolling counters
Interlocked.Exchange(ref _totalDispatchBatchSize, 0);
Interlocked.Exchange(ref _dispatchCycleCount, 0);
_lastMetricsReportTime = now;
MxChangeEventsPerSecond = eventsPerSecond;
AverageDispatchBatchSize = avgQueueSize;
Log.Information(
"DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}, SuppressedStopped={Suppressed}",
eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents, suppressed);
}
/// <inheritdoc />
protected override void Dispose(bool disposing)
{
if (disposing)
{
_dispatchDisposed = true;
_mxAccessClient.OnTagValueChanged -= OnMxAccessDataChange;
// Dispose the runtime probe manager before the MxAccess client teardown so its
// Unadvise calls reach a live client. Disposing the node manager normally runs
// BEFORE the node manager's containing OpcUaServerHost releases the MxAccess
// client, so the probes close cleanly.
_galaxyRuntimeProbeManager?.Dispose();
StopDispatchThread();
DrainPendingBackgroundSubscribes();
_dataChangeSignal.Dispose();
}
base.Dispose(disposing);
}
private void DrainPendingBackgroundSubscribes()
{
var snapshot = _pendingBackgroundSubscribes.Values.ToArray();
if (snapshot.Length == 0)
return;
try
{
Task.WaitAll(snapshot, TimeSpan.FromSeconds(5));
Log.Information("Drained {Count} pending background subscribe(s) on shutdown", snapshot.Length);
}
catch (AggregateException ex)
{
// Individual faults were already logged by the tracked continuation; record the
// aggregate at debug level to aid diagnosis without double-logging each failure.
Log.Debug(ex, "Background subscribe drain completed with {FaultCount} fault(s)",
ex.InnerExceptions.Count);
}
}
#endregion
}
}