using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using Opc.Ua; using Opc.Ua.Server; using Serilog; using ZB.MOM.WW.LmxOpcUa.Host.Domain; using ZB.MOM.WW.LmxOpcUa.Host.Historian; using ZB.MOM.WW.LmxOpcUa.Host.Metrics; namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa { /// /// Custom node manager that builds the OPC UA address space from Galaxy hierarchy data. /// (OPC-002 through OPC-013) /// public class LmxNodeManager : CustomNodeManager2 { private static readonly ILogger Log = Serilog.Log.ForContext(); private readonly IMxAccessClient _mxAccessClient; private readonly PerformanceMetrics _metrics; private readonly HistorianDataSource? _historianDataSource; private readonly bool _alarmTrackingEnabled; private readonly bool _anonymousCanWrite; private readonly string _namespaceUri; // NodeId → full_tag_reference for read/write resolution private readonly Dictionary _nodeIdToTagReference = new Dictionary(StringComparer.OrdinalIgnoreCase); // Ref-counted MXAccess subscriptions private readonly Dictionary _subscriptionRefCounts = new Dictionary(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _tagToVariableNode = new Dictionary(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _tagMetadata = new Dictionary(StringComparer.OrdinalIgnoreCase); private IDictionary>? _externalReferences; // Data change dispatch queue: decouples MXAccess STA callbacks from OPC UA framework Lock private readonly ConcurrentDictionary _pendingDataChanges = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); private readonly AutoResetEvent _dataChangeSignal = new AutoResetEvent(false); private Thread? _dispatchThread; private volatile bool _dispatchRunning; private volatile bool _dispatchDisposed; // Dispatch queue metrics private long _totalMxChangeEvents; private long _lastReportedMxChangeEvents; private long _totalDispatchBatchSize; private long _dispatchCycleCount; private DateTime _lastMetricsReportTime = DateTime.UtcNow; private double _lastEventsPerSecond; private double _lastAvgBatchSize; private sealed class TagMetadata { /// /// Gets or sets the MXAccess data type code used to map Galaxy values into OPC UA variants. /// public int MxDataType { get; set; } /// /// Gets or sets a value indicating whether the source Galaxy attribute should be exposed as an array node. /// public bool IsArray { get; set; } /// /// Gets or sets the declared array length from Galaxy metadata when the attribute is modeled as an array. /// public int? ArrayDimension { get; set; } } // Alarm tracking: maps InAlarm tag reference → alarm source info private readonly Dictionary _alarmInAlarmTags = new Dictionary(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _alarmAckedTags = new Dictionary(StringComparer.OrdinalIgnoreCase); // Incremental sync: persistent node map and reverse lookup private readonly Dictionary _nodeMap = new Dictionary(); private readonly Dictionary> _gobjectToTagRefs = new Dictionary>(); private List? _lastHierarchy; private List? _lastAttributes; private sealed class AlarmInfo { /// /// Gets or sets the full tag reference for the process value whose alarm state is tracked. /// public string SourceTagReference { get; set; } = ""; /// /// Gets or sets the OPC UA node identifier for the source variable that owns the alarm condition. /// public NodeId SourceNodeId { get; set; } = NodeId.Null; /// /// Gets or sets the operator-facing source name used in generated alarm events. /// public string SourceName { get; set; } = ""; /// /// Gets or sets the most recent in-alarm state so duplicate transitions are not reissued. /// public bool LastInAlarm { get; set; } /// /// Gets or sets the retained OPC UA condition node associated with the source alarm. /// public AlarmConditionState? ConditionNode { get; set; } /// /// Gets or sets the Galaxy tag reference that supplies runtime alarm priority updates. /// public string PriorityTagReference { get; set; } = ""; /// /// Gets or sets the Galaxy tag reference or attribute binding used to resolve the alarm message text. /// public string DescAttrNameTagReference { get; set; } = ""; /// /// Gets or sets the cached OPC UA severity derived from the latest alarm priority value. /// public ushort CachedSeverity { get; set; } /// /// Gets or sets the cached alarm message used when emitting active and cleared events. /// public string CachedMessage { get; set; } = ""; /// /// Gets or sets the Galaxy tag reference for the alarm acknowledged state. /// public string AckedTagReference { get; set; } = ""; /// /// Gets or sets the Galaxy tag reference for the acknowledge message that triggers acknowledgment. /// public string AckMsgTagReference { get; set; } = ""; } /// /// Gets the mapping from OPC UA node identifiers to the Galaxy tag references used for runtime I/O. /// public IReadOnlyDictionary NodeIdToTagReference => _nodeIdToTagReference; /// /// Gets the number of variable nodes currently published from Galaxy attributes. /// public int VariableNodeCount { get; private set; } /// /// Gets the number of non-area object nodes currently published from the Galaxy hierarchy. /// public int ObjectNodeCount { get; private set; } /// /// Gets the total number of MXAccess data change events received since startup. /// public long TotalMxChangeEvents => Interlocked.Read(ref _totalMxChangeEvents); /// /// Gets the number of items currently waiting in the dispatch queue. /// public int PendingDataChangeCount => _pendingDataChanges.Count; /// /// Gets the most recently computed MXAccess data change events per second. /// public double MxChangeEventsPerSecond => _lastEventsPerSecond; /// /// Gets the most recently computed average dispatch batch size (proxy for queue depth under load). /// public double AverageDispatchBatchSize => _lastAvgBatchSize; /// /// Initializes a new node manager for the Galaxy-backed OPC UA namespace. /// /// The hosting OPC UA server internals. /// The OPC UA application configuration for the host. /// The namespace URI that identifies the Galaxy model to clients. /// The runtime client used to service reads, writes, and subscriptions. /// The metrics collector used to track node manager activity. /// The optional historian adapter used to satisfy OPC UA history read requests. /// Enables alarm-condition state generation for Galaxy attributes modeled as alarms. public LmxNodeManager( IServerInternal server, ApplicationConfiguration configuration, string namespaceUri, IMxAccessClient mxAccessClient, PerformanceMetrics metrics, HistorianDataSource? historianDataSource = null, bool alarmTrackingEnabled = false, bool anonymousCanWrite = true) : base(server, configuration, namespaceUri) { _namespaceUri = namespaceUri; _mxAccessClient = mxAccessClient; _metrics = metrics; _historianDataSource = historianDataSource; _alarmTrackingEnabled = alarmTrackingEnabled; _anonymousCanWrite = anonymousCanWrite; // Wire up data change delivery _mxAccessClient.OnTagValueChanged += OnMxAccessDataChange; // Start background dispatch thread StartDispatchThread(); } /// public override void CreateAddressSpace(IDictionary> externalReferences) { lock (Lock) { _externalReferences = externalReferences; base.CreateAddressSpace(externalReferences); } } /// /// Builds the address space from Galaxy hierarchy and attributes data. (OPC-002, OPC-003) /// /// The Galaxy object hierarchy that defines folders and objects in the namespace. /// The Galaxy attributes that become OPC UA variable nodes. public void BuildAddressSpace(List hierarchy, List attributes) { lock (Lock) { _nodeIdToTagReference.Clear(); _tagToVariableNode.Clear(); _tagMetadata.Clear(); _alarmInAlarmTags.Clear(); _alarmAckedTags.Clear(); _nodeMap.Clear(); _gobjectToTagRefs.Clear(); VariableNodeCount = 0; ObjectNodeCount = 0; // Topological sort: ensure parents appear before children regardless of input order var sorted = TopologicalSort(hierarchy); // Build lookup: gobject_id → list of attributes var attrsByObject = attributes .GroupBy(a => a.GobjectId) .ToDictionary(g => g.Key, g => g.ToList()); // Root folder — enable events so alarm events propagate to clients subscribed at root var rootFolder = CreateFolder(null, "ZB", "ZB"); rootFolder.NodeId = new NodeId("ZB", NamespaceIndex); rootFolder.EventNotifier = EventNotifiers.SubscribeToEvents; rootFolder.AddReference(ReferenceTypeIds.Organizes, true, ObjectIds.ObjectsFolder); AddPredefinedNode(SystemContext, rootFolder); // Add reverse reference from Objects folder → ZB root. // BuildAddressSpace runs after CreateAddressSpace completes, so the // externalReferences dict has already been consumed by the core node manager. // Use MasterNodeManager.AddReferences to route the reference correctly. Server.NodeManager.AddReferences(ObjectIds.ObjectsFolder, new List { new NodeStateReference(ReferenceTypeIds.Organizes, false, rootFolder.NodeId) }); // Create nodes for each object in hierarchy foreach (var obj in sorted) { NodeState parentNode; if (_nodeMap.TryGetValue(obj.ParentGobjectId, out var p)) parentNode = p; else parentNode = rootFolder; NodeState node; if (obj.IsArea) { // Areas → FolderType + Organizes reference var folder = CreateFolder(parentNode, obj.BrowseName, obj.BrowseName); folder.NodeId = new NodeId(obj.TagName, NamespaceIndex); node = folder; } else { // Non-areas → BaseObjectType + HasComponent reference var objNode = CreateObject(parentNode, obj.BrowseName, obj.BrowseName); objNode.NodeId = new NodeId(obj.TagName, NamespaceIndex); node = objNode; ObjectNodeCount++; } AddPredefinedNode(SystemContext, node); _nodeMap[obj.GobjectId] = node; // Create variable nodes for this object's attributes if (attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) { // Group by primitive_name: empty = direct child, non-empty = sub-object var byPrimitive = objAttrs .GroupBy(a => a.PrimitiveName ?? "") .OrderBy(g => g.Key); // Collect primitive group names so we know which direct attributes have children var primitiveGroupNames = new HashSet( byPrimitive.Select(g => g.Key).Where(k => !string.IsNullOrEmpty(k)), StringComparer.OrdinalIgnoreCase); // Track variable nodes created for direct attributes that also have primitive children var variableNodes = new Dictionary(StringComparer.OrdinalIgnoreCase); // First pass: create direct (root-level) attribute variables var directGroup = byPrimitive.FirstOrDefault(g => string.IsNullOrEmpty(g.Key)); if (directGroup != null) { foreach (var attr in directGroup) { var variable = CreateAttributeVariable(node, attr); if (primitiveGroupNames.Contains(attr.AttributeName)) { variableNodes[attr.AttributeName] = variable; } } } // Second pass: add primitive child attributes under the matching variable node foreach (var group in byPrimitive) { if (string.IsNullOrEmpty(group.Key)) continue; NodeState parentForAttrs; if (variableNodes.TryGetValue(group.Key, out var existingVariable)) { // Merge: use the existing variable node as parent parentForAttrs = existingVariable; } else { // No matching dynamic attribute — create an object node var primNode = CreateObject(node, group.Key, group.Key); primNode.NodeId = new NodeId(obj.TagName + "." + group.Key, NamespaceIndex); AddPredefinedNode(SystemContext, primNode); parentForAttrs = primNode; } foreach (var attr in group) { CreateAttributeVariable(parentForAttrs, attr); } } } } // Build alarm tracking: create AlarmConditionState for each alarm attribute if (_alarmTrackingEnabled) foreach (var obj in sorted) { if (obj.IsArea) continue; if (!attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) continue; var hasAlarms = false; var alarmAttrs = objAttrs.Where(a => a.IsAlarm && string.IsNullOrEmpty(a.PrimitiveName)).ToList(); foreach (var alarmAttr in alarmAttrs) { var inAlarmTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']') + ".InAlarm"; if (!_tagToVariableNode.ContainsKey(inAlarmTagRef)) continue; var alarmNodeIdStr = alarmAttr.FullTagReference.EndsWith("[]") ? alarmAttr.FullTagReference.Substring(0, alarmAttr.FullTagReference.Length - 2) : alarmAttr.FullTagReference; // Find the source variable node for the alarm _tagToVariableNode.TryGetValue(alarmAttr.FullTagReference, out var sourceVariable); var sourceNodeId = new NodeId(alarmNodeIdStr, NamespaceIndex); // Create AlarmConditionState attached to the source variable var conditionNodeId = new NodeId(alarmNodeIdStr + ".Condition", NamespaceIndex); var condition = new AlarmConditionState(sourceVariable); condition.Create(SystemContext, conditionNodeId, new QualifiedName(alarmAttr.AttributeName + "Alarm", NamespaceIndex), new LocalizedText("en", alarmAttr.AttributeName + " Alarm"), true); condition.SourceNode.Value = sourceNodeId; condition.SourceName.Value = alarmAttr.FullTagReference.TrimEnd('[', ']'); condition.ConditionName.Value = alarmAttr.AttributeName; condition.AutoReportStateChanges = true; // Set initial state: enabled, inactive, acknowledged condition.SetEnableState(SystemContext, true); condition.SetActiveState(SystemContext, false); condition.SetAcknowledgedState(SystemContext, true); condition.SetSeverity(SystemContext, EventSeverity.Medium); condition.Retain.Value = false; condition.OnReportEvent = (context, node, e) => Server.ReportEvent(context, e); condition.OnAcknowledge = OnAlarmAcknowledge; // Add HasCondition reference from source to condition if (sourceVariable != null) { sourceVariable.AddReference(ReferenceTypeIds.HasCondition, false, conditionNodeId); condition.AddReference(ReferenceTypeIds.HasCondition, true, sourceNodeId); } AddPredefinedNode(SystemContext, condition); var baseTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']'); var alarmInfo = new AlarmInfo { SourceTagReference = alarmAttr.FullTagReference, SourceNodeId = sourceNodeId, SourceName = alarmAttr.AttributeName, ConditionNode = condition, PriorityTagReference = baseTagRef + ".Priority", DescAttrNameTagReference = baseTagRef + ".DescAttrName", AckedTagReference = baseTagRef + ".Acked", AckMsgTagReference = baseTagRef + ".AckMsg" }; _alarmInAlarmTags[inAlarmTagRef] = alarmInfo; _alarmAckedTags[alarmInfo.AckedTagReference] = alarmInfo; hasAlarms = true; } // Enable EventNotifier on object nodes that contain alarms if (hasAlarms && _nodeMap.TryGetValue(obj.GobjectId, out var objNode)) { if (objNode is BaseObjectState objState) objState.EventNotifier = EventNotifiers.SubscribeToEvents; else if (objNode is FolderState folderState) folderState.EventNotifier = EventNotifiers.SubscribeToEvents; } } // Auto-subscribe to InAlarm tags so we detect alarm transitions if (_alarmTrackingEnabled) SubscribeAlarmTags(); _lastHierarchy = new List(hierarchy); _lastAttributes = new List(attributes); Log.Information("Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references, {Alarms} alarm tags", ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count, _alarmInAlarmTags.Count); } } private void SubscribeAlarmTags() { foreach (var kvp in _alarmInAlarmTags) { // Subscribe to InAlarm, Priority, and DescAttrName for each alarm var tagsToSubscribe = new[] { kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference, kvp.Value.AckedTagReference }; foreach (var tag in tagsToSubscribe) { if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag)) continue; try { _mxAccessClient.SubscribeAsync(tag, (_, _) => { }); } catch (Exception ex) { Log.Warning(ex, "Failed to auto-subscribe to alarm tag {Tag}", tag); } } } } private ServiceResult OnAlarmAcknowledge( ISystemContext context, ConditionState condition, byte[] eventId, LocalizedText comment) { var alarmInfo = _alarmInAlarmTags.Values .FirstOrDefault(a => a.ConditionNode == condition); if (alarmInfo == null) return new ServiceResult(StatusCodes.BadNodeIdUnknown); try { var ackMessage = comment?.Text ?? ""; _mxAccessClient.WriteAsync(alarmInfo.AckMsgTagReference, ackMessage) .GetAwaiter().GetResult(); Log.Information("Alarm acknowledge sent: {Source} (Message={AckMsg})", alarmInfo.SourceName, ackMessage); return ServiceResult.Good; } catch (Exception ex) { Log.Warning(ex, "Failed to write AckMsg for {Source}", alarmInfo.SourceName); return new ServiceResult(StatusCodes.BadInternalError); } } private void ReportAlarmEvent(AlarmInfo info, bool active) { var condition = info.ConditionNode; if (condition == null) return; ushort severity = info.CachedSeverity; string message = active ? (!string.IsNullOrEmpty(info.CachedMessage) ? info.CachedMessage : $"Alarm active: {info.SourceName}") : $"Alarm cleared: {info.SourceName}"; // Set a new EventId so clients can reference this event for acknowledge condition.EventId.Value = Guid.NewGuid().ToByteArray(); condition.SetActiveState(SystemContext, active); condition.Message.Value = new LocalizedText("en", message); condition.SetSeverity(SystemContext, (EventSeverity)severity); // Retain while active or unacknowledged condition.Retain.Value = active || (condition.AckedState?.Id?.Value == false); // Reset acknowledged state when alarm activates if (active) condition.SetAcknowledgedState(SystemContext, false); // Report through the source node hierarchy so events reach subscribers on parent objects if (_tagToVariableNode.TryGetValue(info.SourceTagReference, out var sourceVar) && sourceVar.Parent != null) { sourceVar.Parent.ReportEvent(SystemContext, condition); } // Also report to Server node for clients subscribed at server level Server.ReportEvent(SystemContext, condition); Log.Information("Alarm {State}: {Source} (Severity={Severity}, Message={Message})", active ? "ACTIVE" : "CLEARED", info.SourceName, severity, message); } /// /// Rebuilds the address space, removing old nodes and creating new ones. (OPC-010) /// /// The latest Galaxy object hierarchy to publish. /// The latest Galaxy attributes to publish. public void RebuildAddressSpace(List hierarchy, List attributes) { SyncAddressSpace(hierarchy, attributes); } /// /// Incrementally syncs the address space by detecting changed gobjects and rebuilding only those subtrees. (OPC-010) /// /// The latest Galaxy object hierarchy snapshot to compare against the currently published model. /// The latest Galaxy attribute snapshot to compare against the currently published variables. public void SyncAddressSpace(List hierarchy, List attributes) { lock (Lock) { if (_lastHierarchy == null || _lastAttributes == null) { Log.Information("No previous state cached — performing full build"); BuildAddressSpace(hierarchy, attributes); return; } var changedIds = AddressSpaceDiff.FindChangedGobjectIds( _lastHierarchy, _lastAttributes, hierarchy, attributes); if (changedIds.Count == 0) { Log.Information("No address space changes detected"); _lastHierarchy = hierarchy; _lastAttributes = attributes; return; } // Expand to include child subtrees in both old and new hierarchy changedIds = AddressSpaceDiff.ExpandToSubtrees(changedIds, _lastHierarchy); changedIds = AddressSpaceDiff.ExpandToSubtrees(changedIds, hierarchy); Log.Information("Incremental sync: {Count} gobjects changed out of {Total}", changedIds.Count, hierarchy.Count); // Snapshot subscriptions for changed tags before teardown var affectedSubscriptions = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var id in changedIds) { if (_gobjectToTagRefs.TryGetValue(id, out var tagRefs)) { foreach (var tagRef in tagRefs) { if (_subscriptionRefCounts.TryGetValue(tagRef, out var count)) affectedSubscriptions[tagRef] = count; } } } // Tear down changed subtrees TearDownGobjects(changedIds); // Rebuild changed subtrees from new data var changedHierarchy = hierarchy.Where(h => changedIds.Contains(h.GobjectId)).ToList(); var changedAttributes = attributes.Where(a => changedIds.Contains(a.GobjectId)).ToList(); BuildSubtree(changedHierarchy, changedAttributes); // Restore subscriptions for surviving tags foreach (var kvp in affectedSubscriptions) { if (!_tagToVariableNode.ContainsKey(kvp.Key)) continue; try { _mxAccessClient.SubscribeAsync(kvp.Key, (_, _) => { }).GetAwaiter().GetResult(); _subscriptionRefCounts[kvp.Key] = kvp.Value; } catch (Exception ex) { Log.Warning(ex, "Failed to restore subscription for {TagRef} after sync", kvp.Key); } } _lastHierarchy = new List(hierarchy); _lastAttributes = new List(attributes); Log.Information("Incremental sync complete: {Objects} objects, {Variables} variables, {Alarms} alarms", ObjectNodeCount, VariableNodeCount, _alarmInAlarmTags.Count); } } private void TearDownGobjects(HashSet gobjectIds) { foreach (var id in gobjectIds) { // Remove variable nodes and their tracking data if (_gobjectToTagRefs.TryGetValue(id, out var tagRefs)) { foreach (var tagRef in tagRefs.ToList()) { // Unsubscribe if actively subscribed if (_subscriptionRefCounts.ContainsKey(tagRef)) { try { _mxAccessClient.UnsubscribeAsync(tagRef).GetAwaiter().GetResult(); } catch { /* ignore */ } _subscriptionRefCounts.Remove(tagRef); } // Remove alarm tracking for this tag's InAlarm/Priority/DescAttrName var alarmKeysToRemove = _alarmInAlarmTags .Where(kvp => kvp.Value.SourceTagReference == tagRef) .Select(kvp => kvp.Key) .ToList(); foreach (var alarmKey in alarmKeysToRemove) { var info = _alarmInAlarmTags[alarmKey]; // Unsubscribe alarm auto-subscriptions foreach (var alarmTag in new[] { alarmKey, info.PriorityTagReference, info.DescAttrNameTagReference }) { if (!string.IsNullOrEmpty(alarmTag)) { try { _mxAccessClient.UnsubscribeAsync(alarmTag).GetAwaiter().GetResult(); } catch { /* ignore */ } } } _alarmInAlarmTags.Remove(alarmKey); if (!string.IsNullOrEmpty(info.AckedTagReference)) _alarmAckedTags.Remove(info.AckedTagReference); } // Delete variable node if (_tagToVariableNode.TryGetValue(tagRef, out var variable)) { try { DeleteNode(SystemContext, variable.NodeId); } catch { /* ignore */ } _tagToVariableNode.Remove(tagRef); } // Clean up remaining mappings var nodeIdStr = _nodeIdToTagReference.FirstOrDefault(kvp => kvp.Value == tagRef).Key; if (nodeIdStr != null) _nodeIdToTagReference.Remove(nodeIdStr); _tagMetadata.Remove(tagRef); VariableNodeCount--; } _gobjectToTagRefs.Remove(id); } // Delete the object/folder node itself if (_nodeMap.TryGetValue(id, out var objNode)) { try { DeleteNode(SystemContext, objNode.NodeId); } catch { /* ignore */ } _nodeMap.Remove(id); if (!(objNode is FolderState)) ObjectNodeCount--; } } } private void BuildSubtree(List hierarchy, List attributes) { if (hierarchy.Count == 0) return; var sorted = TopologicalSort(hierarchy); var attrsByObject = attributes .GroupBy(a => a.GobjectId) .ToDictionary(g => g.Key, g => g.ToList()); // Find root folder for orphaned nodes NodeState? rootFolder = null; if (PredefinedNodes.TryGetValue(new NodeId("ZB", NamespaceIndex), out var rootNode)) rootFolder = rootNode; foreach (var obj in sorted) { NodeState parentNode; if (_nodeMap.TryGetValue(obj.ParentGobjectId, out var p)) parentNode = p; else if (rootFolder != null) parentNode = rootFolder; else continue; // no parent available // Create node with final NodeId before adding to parent NodeState node; var nodeId = new NodeId(obj.TagName, NamespaceIndex); if (obj.IsArea) { var folder = new FolderState(parentNode) { SymbolicName = obj.BrowseName, ReferenceTypeId = ReferenceTypes.Organizes, TypeDefinitionId = ObjectTypeIds.FolderType, NodeId = nodeId, BrowseName = new QualifiedName(obj.BrowseName, NamespaceIndex), DisplayName = new LocalizedText("en", obj.BrowseName), WriteMask = AttributeWriteMask.None, UserWriteMask = AttributeWriteMask.None, EventNotifier = EventNotifiers.None }; parentNode.AddChild(folder); node = folder; } else { var objNode = new BaseObjectState(parentNode) { SymbolicName = obj.BrowseName, ReferenceTypeId = ReferenceTypes.HasComponent, TypeDefinitionId = ObjectTypeIds.BaseObjectType, NodeId = nodeId, BrowseName = new QualifiedName(obj.BrowseName, NamespaceIndex), DisplayName = new LocalizedText("en", obj.BrowseName), WriteMask = AttributeWriteMask.None, UserWriteMask = AttributeWriteMask.None, EventNotifier = EventNotifiers.None }; parentNode.AddChild(objNode); node = objNode; ObjectNodeCount++; } AddPredefinedNode(SystemContext, node); _nodeMap[obj.GobjectId] = node; parentNode.ClearChangeMasks(SystemContext, false); // Create variable nodes (same logic as BuildAddressSpace) if (attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) { var byPrimitive = objAttrs .GroupBy(a => a.PrimitiveName ?? "") .OrderBy(g => g.Key); var primitiveGroupNames = new HashSet( byPrimitive.Select(g => g.Key).Where(k => !string.IsNullOrEmpty(k)), StringComparer.OrdinalIgnoreCase); var variableNodes = new Dictionary(StringComparer.OrdinalIgnoreCase); var directGroup = byPrimitive.FirstOrDefault(g => string.IsNullOrEmpty(g.Key)); if (directGroup != null) { foreach (var attr in directGroup) { var variable = CreateAttributeVariable(node, attr); if (primitiveGroupNames.Contains(attr.AttributeName)) variableNodes[attr.AttributeName] = variable; } } foreach (var group in byPrimitive) { if (string.IsNullOrEmpty(group.Key)) continue; NodeState parentForAttrs; if (variableNodes.TryGetValue(group.Key, out var existingVariable)) { parentForAttrs = existingVariable; } else { var primNode = CreateObject(node, group.Key, group.Key); primNode.NodeId = new NodeId(obj.TagName + "." + group.Key, NamespaceIndex); AddPredefinedNode(SystemContext, primNode); parentForAttrs = primNode; } foreach (var attr in group) { CreateAttributeVariable(parentForAttrs, attr); } } } } // Alarm tracking for the new subtree if (_alarmTrackingEnabled) { foreach (var obj in sorted) { if (obj.IsArea) continue; if (!attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) continue; var hasAlarms = false; var alarmAttrs = objAttrs.Where(a => a.IsAlarm && string.IsNullOrEmpty(a.PrimitiveName)).ToList(); foreach (var alarmAttr in alarmAttrs) { var inAlarmTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']') + ".InAlarm"; if (!_tagToVariableNode.ContainsKey(inAlarmTagRef)) continue; var alarmNodeIdStr = alarmAttr.FullTagReference.EndsWith("[]") ? alarmAttr.FullTagReference.Substring(0, alarmAttr.FullTagReference.Length - 2) : alarmAttr.FullTagReference; _tagToVariableNode.TryGetValue(alarmAttr.FullTagReference, out var sourceVariable); var sourceNodeId = new NodeId(alarmNodeIdStr, NamespaceIndex); var conditionNodeId = new NodeId(alarmNodeIdStr + ".Condition", NamespaceIndex); var condition = new AlarmConditionState(sourceVariable); condition.Create(SystemContext, conditionNodeId, new QualifiedName(alarmAttr.AttributeName + "Alarm", NamespaceIndex), new LocalizedText("en", alarmAttr.AttributeName + " Alarm"), true); condition.SourceNode.Value = sourceNodeId; condition.SourceName.Value = alarmAttr.FullTagReference.TrimEnd('[', ']'); condition.ConditionName.Value = alarmAttr.AttributeName; condition.AutoReportStateChanges = true; condition.SetEnableState(SystemContext, true); condition.SetActiveState(SystemContext, false); condition.SetAcknowledgedState(SystemContext, true); condition.SetSeverity(SystemContext, EventSeverity.Medium); condition.Retain.Value = false; condition.OnReportEvent = (context, n, e) => Server.ReportEvent(context, e); condition.OnAcknowledge = OnAlarmAcknowledge; if (sourceVariable != null) { sourceVariable.AddReference(ReferenceTypeIds.HasCondition, false, conditionNodeId); condition.AddReference(ReferenceTypeIds.HasCondition, true, sourceNodeId); } AddPredefinedNode(SystemContext, condition); var baseTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']'); var alarmInfo = new AlarmInfo { SourceTagReference = alarmAttr.FullTagReference, SourceNodeId = sourceNodeId, SourceName = alarmAttr.AttributeName, ConditionNode = condition, PriorityTagReference = baseTagRef + ".Priority", DescAttrNameTagReference = baseTagRef + ".DescAttrName", AckedTagReference = baseTagRef + ".Acked", AckMsgTagReference = baseTagRef + ".AckMsg" }; _alarmInAlarmTags[inAlarmTagRef] = alarmInfo; _alarmAckedTags[alarmInfo.AckedTagReference] = alarmInfo; hasAlarms = true; } if (hasAlarms && _nodeMap.TryGetValue(obj.GobjectId, out var objNode)) { if (objNode is BaseObjectState objState) objState.EventNotifier = EventNotifiers.SubscribeToEvents; else if (objNode is FolderState folderState) folderState.EventNotifier = EventNotifiers.SubscribeToEvents; } } // Subscribe alarm tags for new subtree foreach (var kvp in _alarmInAlarmTags) { // Only subscribe tags that belong to the newly built subtree var gobjectIds = new HashSet(hierarchy.Select(h => h.GobjectId)); var sourceTagRef = kvp.Value.SourceTagReference; var ownerAttr = attributes.FirstOrDefault(a => a.FullTagReference == sourceTagRef); if (ownerAttr == null || !gobjectIds.Contains(ownerAttr.GobjectId)) continue; foreach (var tag in new[] { kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference }) { if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag)) continue; try { _mxAccessClient.SubscribeAsync(tag, (_, _) => { }); } catch { /* ignore */ } } } } } /// /// Sorts hierarchy so parents always appear before children, regardless of input order. /// private static List TopologicalSort(List hierarchy) { var byId = hierarchy.ToDictionary(h => h.GobjectId); var knownIds = new HashSet(hierarchy.Select(h => h.GobjectId)); var visited = new HashSet(); var result = new List(hierarchy.Count); void Visit(GalaxyObjectInfo obj) { if (!visited.Add(obj.GobjectId)) return; // Visit parent first if it exists in the hierarchy if (knownIds.Contains(obj.ParentGobjectId) && byId.TryGetValue(obj.ParentGobjectId, out var parent)) Visit(parent); result.Add(obj); } foreach (var obj in hierarchy) Visit(obj); return result; } private BaseDataVariableState CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr) { var opcUaDataTypeId = MxDataTypeMapper.MapToOpcUaDataType(attr.MxDataType); var variable = CreateVariable(parent, attr.AttributeName, attr.AttributeName, new NodeId(opcUaDataTypeId), attr.IsArray ? ValueRanks.OneDimension : ValueRanks.Scalar); var nodeIdString = GetNodeIdentifier(attr); variable.NodeId = new NodeId(nodeIdString, NamespaceIndex); if (attr.IsArray && attr.ArrayDimension.HasValue) { variable.ArrayDimensions = new ReadOnlyList(new List { (uint)attr.ArrayDimension.Value }); } var accessLevel = SecurityClassificationMapper.IsWritable(attr.SecurityClassification) ? AccessLevels.CurrentReadOrWrite : AccessLevels.CurrentRead; if (attr.IsHistorized) { accessLevel |= AccessLevels.HistoryRead; } variable.AccessLevel = accessLevel; variable.UserAccessLevel = accessLevel; variable.Historizing = attr.IsHistorized; variable.Value = NormalizePublishedValue(attr.FullTagReference, null); variable.StatusCode = StatusCodes.BadWaitingForInitialData; variable.Timestamp = DateTime.UtcNow; AddPredefinedNode(SystemContext, variable); _nodeIdToTagReference[nodeIdString] = attr.FullTagReference; _tagToVariableNode[attr.FullTagReference] = variable; _tagMetadata[attr.FullTagReference] = new TagMetadata { MxDataType = attr.MxDataType, IsArray = attr.IsArray, ArrayDimension = attr.ArrayDimension }; // Track gobject → tag references for incremental sync if (!_gobjectToTagRefs.TryGetValue(attr.GobjectId, out var tagList)) { tagList = new List(); _gobjectToTagRefs[attr.GobjectId] = tagList; } tagList.Add(attr.FullTagReference); VariableNodeCount++; return variable; } private static string GetNodeIdentifier(GalaxyAttributeInfo attr) { if (!attr.IsArray) return attr.FullTagReference; return attr.FullTagReference.EndsWith("[]", StringComparison.Ordinal) ? attr.FullTagReference.Substring(0, attr.FullTagReference.Length - 2) : attr.FullTagReference; } private FolderState CreateFolder(NodeState? parent, string path, string name) { var folder = new FolderState(parent) { SymbolicName = name, ReferenceTypeId = ReferenceTypes.Organizes, TypeDefinitionId = ObjectTypeIds.FolderType, NodeId = new NodeId(path, NamespaceIndex), BrowseName = new QualifiedName(name, NamespaceIndex), DisplayName = new LocalizedText("en", name), WriteMask = AttributeWriteMask.None, UserWriteMask = AttributeWriteMask.None, EventNotifier = EventNotifiers.None }; parent?.AddChild(folder); return folder; } private BaseObjectState CreateObject(NodeState parent, string path, string name) { var obj = new BaseObjectState(parent) { SymbolicName = name, ReferenceTypeId = ReferenceTypes.HasComponent, TypeDefinitionId = ObjectTypeIds.BaseObjectType, NodeId = new NodeId(path, NamespaceIndex), BrowseName = new QualifiedName(name, NamespaceIndex), DisplayName = new LocalizedText("en", name), WriteMask = AttributeWriteMask.None, UserWriteMask = AttributeWriteMask.None, EventNotifier = EventNotifiers.None }; parent.AddChild(obj); return obj; } private BaseDataVariableState CreateVariable(NodeState parent, string path, string name, NodeId dataType, int valueRank) { var variable = new BaseDataVariableState(parent) { SymbolicName = name, ReferenceTypeId = ReferenceTypes.HasComponent, TypeDefinitionId = VariableTypeIds.BaseDataVariableType, NodeId = new NodeId(path, NamespaceIndex), BrowseName = new QualifiedName(name, NamespaceIndex), DisplayName = new LocalizedText("en", name), WriteMask = AttributeWriteMask.None, UserWriteMask = AttributeWriteMask.None, DataType = dataType, ValueRank = valueRank, AccessLevel = AccessLevels.CurrentReadOrWrite, UserAccessLevel = AccessLevels.CurrentReadOrWrite, Historizing = false, StatusCode = StatusCodes.Good, Timestamp = DateTime.UtcNow }; parent.AddChild(variable); return variable; } #region Read/Write Handlers /// public override void Read(OperationContext context, double maxAge, IList nodesToRead, IList results, IList errors) { base.Read(context, maxAge, nodesToRead, results, errors); for (int i = 0; i < nodesToRead.Count; i++) { if (nodesToRead[i].AttributeId != Attributes.Value) continue; var nodeId = nodesToRead[i].NodeId; if (nodeId.NamespaceIndex != NamespaceIndex) continue; var nodeIdStr = nodeId.Identifier as string; if (nodeIdStr == null) continue; if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) { try { var vtq = _mxAccessClient.ReadAsync(tagRef).GetAwaiter().GetResult(); results[i] = CreatePublishedDataValue(tagRef, vtq); errors[i] = ServiceResult.Good; } catch (Exception ex) { Log.Warning(ex, "Read failed for {TagRef}", tagRef); errors[i] = new ServiceResult(StatusCodes.BadInternalError); } } } } /// public override void Write(OperationContext context, IList nodesToWrite, IList errors) { base.Write(context, nodesToWrite, errors); for (int i = 0; i < nodesToWrite.Count; i++) { if (nodesToWrite[i].AttributeId != Attributes.Value) continue; // Skip if base rejected due to access level (read-only node) if (errors[i] != null && errors[i].StatusCode == StatusCodes.BadNotWritable) continue; // Enforce role-based write access: reject anonymous writes when AnonymousCanWrite is false if (!_anonymousCanWrite && context.UserIdentity?.GrantedRoleIds != null && !context.UserIdentity.GrantedRoleIds.Contains(ObjectIds.WellKnownRole_AuthenticatedUser)) { errors[i] = new ServiceResult(StatusCodes.BadUserAccessDenied); continue; } var nodeId = nodesToWrite[i].NodeId; if (nodeId.NamespaceIndex != NamespaceIndex) continue; var nodeIdStr = nodeId.Identifier as string; if (nodeIdStr == null) continue; if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) { try { var writeValue = nodesToWrite[i]; var value = writeValue.Value.WrappedValue.Value; if (!string.IsNullOrWhiteSpace(writeValue.IndexRange)) { if (!TryApplyArrayElementWrite(tagRef, value, writeValue.IndexRange, out var updatedArray)) { errors[i] = new ServiceResult(StatusCodes.BadIndexRangeInvalid); continue; } value = updatedArray; } var success = _mxAccessClient.WriteAsync(tagRef, value).GetAwaiter().GetResult(); if (success) { PublishLocalWrite(tagRef, value); errors[i] = ServiceResult.Good; } else { errors[i] = new ServiceResult(StatusCodes.BadInternalError); } } catch (Exception ex) { Log.Warning(ex, "Write failed for {TagRef}", tagRef); errors[i] = new ServiceResult(StatusCodes.BadInternalError); } } } } private bool TryApplyArrayElementWrite(string tagRef, object? writeValue, string indexRange, out object updatedArray) { updatedArray = null!; if (!int.TryParse(indexRange, out var index) || index < 0) return false; var currentValue = NormalizePublishedValue(tagRef, _mxAccessClient.ReadAsync(tagRef).GetAwaiter().GetResult().Value); if (currentValue is not Array currentArray || currentArray.Rank != 1 || index >= currentArray.Length) return false; var nextArray = (Array)currentArray.Clone(); var elementType = currentArray.GetType().GetElementType(); if (elementType == null) return false; var normalizedValue = NormalizeIndexedWriteValue(writeValue); nextArray.SetValue(ConvertArrayElementValue(normalizedValue, elementType), index); updatedArray = nextArray; return true; } private static object? NormalizeIndexedWriteValue(object? value) { if (value is Array array && array.Length == 1) return array.GetValue(0); return value; } private static object? ConvertArrayElementValue(object? value, Type elementType) { if (value == null) { if (elementType.IsValueType) return Activator.CreateInstance(elementType); return null; } if (elementType.IsInstanceOfType(value)) return value; if (elementType == typeof(string)) return value.ToString(); return Convert.ChangeType(value, elementType); } private void PublishLocalWrite(string tagRef, object? value) { if (!_tagToVariableNode.TryGetValue(tagRef, out var variable)) return; var dataValue = CreatePublishedDataValue(tagRef, Vtq.Good(value)); variable.Value = dataValue.Value; variable.StatusCode = dataValue.StatusCode; variable.Timestamp = dataValue.SourceTimestamp; variable.ClearChangeMasks(SystemContext, false); } private DataValue CreatePublishedDataValue(string tagRef, Vtq vtq) { var normalizedValue = NormalizePublishedValue(tagRef, vtq.Value); if (ReferenceEquals(normalizedValue, vtq.Value)) return DataValueConverter.FromVtq(vtq); return DataValueConverter.FromVtq(new Vtq(normalizedValue, vtq.Timestamp, vtq.Quality)); } private object? NormalizePublishedValue(string tagRef, object? value) { if (value != null) return value; if (!_tagMetadata.TryGetValue(tagRef, out var metadata) || !metadata.IsArray || !metadata.ArrayDimension.HasValue) return null; return CreateDefaultArrayValue(metadata); } private static Array CreateDefaultArrayValue(TagMetadata metadata) { var elementType = MxDataTypeMapper.MapToClrType(metadata.MxDataType); var values = Array.CreateInstance(elementType, metadata.ArrayDimension!.Value); if (elementType == typeof(string)) { for (int i = 0; i < values.Length; i++) values.SetValue(string.Empty, i); } return values; } #endregion #region Condition Refresh /// /// The OPC UA request context for the condition refresh operation. /// The monitored event items that should receive retained alarm conditions. public override ServiceResult ConditionRefresh(OperationContext context, IList monitoredItems) { foreach (var kvp in _alarmInAlarmTags) { var info = kvp.Value; if (info.ConditionNode == null || info.ConditionNode.Retain?.Value != true) continue; foreach (var item in monitoredItems) { item.QueueEvent(info.ConditionNode); } } return ServiceResult.Good; } #endregion #region HistoryRead /// protected override void HistoryReadRawModified( ServerSystemContext context, ReadRawModifiedDetails details, TimestampsToReturn timestampsToReturn, IList nodesToRead, IList results, IList errors, List nodesToProcess, IDictionary cache) { foreach (var handle in nodesToProcess) { var idx = handle.Index; var nodeIdStr = handle.NodeId?.Identifier as string; if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) { errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown); continue; } if (_historianDataSource == null) { errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported); continue; } try { var maxValues = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0; var dataValues = _historianDataSource.ReadRawAsync( tagRef, details.StartTime, details.EndTime, maxValues) .GetAwaiter().GetResult(); var historyData = new HistoryData(); historyData.DataValues.AddRange(dataValues); results[idx] = new HistoryReadResult { StatusCode = StatusCodes.Good, HistoryData = new ExtensionObject(historyData) }; errors[idx] = ServiceResult.Good; } catch (Exception ex) { Log.Warning(ex, "HistoryRead raw failed for {TagRef}", tagRef); errors[idx] = new ServiceResult(StatusCodes.BadInternalError); } } } /// protected override void HistoryReadProcessed( ServerSystemContext context, ReadProcessedDetails details, TimestampsToReturn timestampsToReturn, IList nodesToRead, IList results, IList errors, List nodesToProcess, IDictionary cache) { foreach (var handle in nodesToProcess) { var idx = handle.Index; var nodeIdStr = handle.NodeId?.Identifier as string; if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) { errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown); continue; } if (_historianDataSource == null) { errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported); continue; } if (details.AggregateType == null || details.AggregateType.Count == 0) { errors[idx] = new ServiceResult(StatusCodes.BadAggregateListMismatch); continue; } var aggregateId = details.AggregateType[idx < details.AggregateType.Count ? idx : 0]; var column = HistorianDataSource.MapAggregateToColumn(aggregateId); if (column == null) { errors[idx] = new ServiceResult(StatusCodes.BadAggregateNotSupported); continue; } try { var dataValues = _historianDataSource.ReadAggregateAsync( tagRef, details.StartTime, details.EndTime, details.ProcessingInterval, column) .GetAwaiter().GetResult(); var historyData = new HistoryData(); historyData.DataValues.AddRange(dataValues); results[idx] = new HistoryReadResult { StatusCode = StatusCodes.Good, HistoryData = new ExtensionObject(historyData) }; errors[idx] = ServiceResult.Good; } catch (Exception ex) { Log.Warning(ex, "HistoryRead processed failed for {TagRef}", tagRef); errors[idx] = new ServiceResult(StatusCodes.BadInternalError); } } } #endregion #region Subscription Delivery /// /// Called by the OPC UA framework during monitored item creation. /// Triggers ref-counted MXAccess subscriptions early so the runtime value /// can arrive before the initial publish to the client. /// /// protected override void OnMonitoredItemCreated(ServerSystemContext context, NodeHandle handle, MonitoredItem monitoredItem) { base.OnMonitoredItemCreated(context, handle, monitoredItem); var nodeIdStr = handle?.NodeId?.Identifier as string; if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) SubscribeTag(tagRef); } /// /// Called by the OPC UA framework after monitored items are deleted. /// Decrements ref-counted MXAccess subscriptions. /// /// protected override void OnDeleteMonitoredItemsComplete(ServerSystemContext context, IList monitoredItems) { foreach (var item in monitoredItems) { var nodeIdStr = GetNodeIdString(item); if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) UnsubscribeTag(tagRef); } } /// /// Called by the OPC UA framework after monitored items are transferred to a new session. /// Rebuilds MXAccess subscription bookkeeping when transferred items arrive without local in-memory state. /// /// protected override void OnMonitoredItemsTransferred(ServerSystemContext context, IList monitoredItems) { base.OnMonitoredItemsTransferred(context, monitoredItems); var transferredTagRefs = monitoredItems .Select(GetNodeIdString) .Where(nodeIdStr => nodeIdStr != null && _nodeIdToTagReference.ContainsKey(nodeIdStr)) .Select(nodeIdStr => _nodeIdToTagReference[nodeIdStr!]) .ToList(); RestoreTransferredSubscriptions(transferredTagRefs); } private static string? GetNodeIdString(IMonitoredItem item) { if (item.ManagerHandle is NodeState node) return node.NodeId?.Identifier as string; return null; } /// /// Increments the subscription reference count for a Galaxy tag and opens the runtime subscription when the first OPC UA monitored item appears. /// /// The fully qualified Galaxy tag reference to subscribe. internal void SubscribeTag(string fullTagReference) { var shouldSubscribe = false; lock (Lock) { if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count)) { _subscriptionRefCounts[fullTagReference] = count + 1; } else { _subscriptionRefCounts[fullTagReference] = 1; shouldSubscribe = true; } } if (shouldSubscribe) _ = _mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { }); } /// /// Decrements the subscription reference count for a Galaxy tag and closes the runtime subscription when no OPC UA monitored items remain. /// /// The fully qualified Galaxy tag reference to unsubscribe. internal void UnsubscribeTag(string fullTagReference) { var shouldUnsubscribe = false; lock (Lock) { if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count)) { if (count <= 1) { _subscriptionRefCounts.Remove(fullTagReference); shouldUnsubscribe = true; } else { _subscriptionRefCounts[fullTagReference] = count - 1; } } } if (shouldUnsubscribe) _ = _mxAccessClient.UnsubscribeAsync(fullTagReference); } /// /// Rebuilds subscription reference counts for monitored items that were transferred by the OPC UA stack. /// Existing in-memory bookkeeping is preserved to avoid double-counting normal in-process transfers. /// /// The Galaxy tag references represented by the transferred monitored items. internal void RestoreTransferredSubscriptions(IEnumerable fullTagReferences) { var transferredCounts = fullTagReferences .GroupBy(tagRef => tagRef, StringComparer.OrdinalIgnoreCase) .ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase); var tagsToSubscribe = new List(); foreach (var kvp in transferredCounts) { lock (Lock) { if (_subscriptionRefCounts.ContainsKey(kvp.Key)) continue; _subscriptionRefCounts[kvp.Key] = kvp.Value; tagsToSubscribe.Add(kvp.Key); } } foreach (var tagRef in tagsToSubscribe) _ = _mxAccessClient.SubscribeAsync(tagRef, (_, _) => { }); } private void OnMxAccessDataChange(string address, Vtq vtq) { if (_dispatchDisposed) return; Interlocked.Increment(ref _totalMxChangeEvents); _pendingDataChanges[address] = vtq; try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { // Shutdown may race with one final callback from the runtime. } } #endregion #region Data Change Dispatch private void StartDispatchThread() { _dispatchRunning = true; _dispatchThread = new Thread(DispatchLoop) { Name = "OpcUaDataChangeDispatch", IsBackground = true }; _dispatchThread.Start(); } private void StopDispatchThread() { _dispatchRunning = false; _dataChangeSignal.Set(); _dispatchThread?.Join(TimeSpan.FromSeconds(5)); } private void DispatchLoop() { Log.Information("Data change dispatch thread started"); while (_dispatchRunning) { try { _dataChangeSignal.WaitOne(TimeSpan.FromMilliseconds(100)); if (!_dispatchRunning) break; var keys = _pendingDataChanges.Keys.ToList(); if (keys.Count == 0) { ReportDispatchMetricsIfDue(); continue; } // Prepare updates outside the Lock. Shared-state lookups stay inside the Lock. var updates = new List<(string address, BaseDataVariableState variable, DataValue dataValue)>(keys.Count); var pendingAlarmEvents = new List<(string address, AlarmInfo info, bool active, ushort? severity, string? message)>(); var pendingAckedEvents = new List<(AlarmInfo info, bool acked)>(); foreach (var address in keys) { if (!_pendingDataChanges.TryRemove(address, out var vtq)) continue; AlarmInfo? alarmInfo = null; AlarmInfo? ackedAlarmInfo = null; bool newInAlarm = false; bool newAcked = false; lock (Lock) { if (_tagToVariableNode.TryGetValue(address, out var variable)) { try { var dataValue = CreatePublishedDataValue(address, vtq); updates.Add((address, variable, dataValue)); } catch (Exception ex) { Log.Warning(ex, "Error preparing data change for {Address}", address); } } if (_alarmInAlarmTags.TryGetValue(address, out alarmInfo)) { newInAlarm = vtq.Value is true || vtq.Value is 1 || (vtq.Value is int intVal && intVal != 0); if (newInAlarm == alarmInfo.LastInAlarm) alarmInfo = null; } // Check for Acked transitions if (_alarmAckedTags.TryGetValue(address, out ackedAlarmInfo)) { newAcked = vtq.Value is true || vtq.Value is 1 || (vtq.Value is int ackedIntVal && ackedIntVal != 0); pendingAckedEvents.Add((ackedAlarmInfo, newAcked)); ackedAlarmInfo = null; // handled } } if (alarmInfo == null) continue; ushort? severity = null; string? message = null; if (newInAlarm) { try { var pVtq = _mxAccessClient.ReadAsync(alarmInfo.PriorityTagReference).GetAwaiter().GetResult(); if (pVtq.Value is int ip) severity = (ushort)System.Math.Min(System.Math.Max(ip, 1), 1000); else if (pVtq.Value is short sp) severity = (ushort)System.Math.Min(System.Math.Max((int)sp, 1), 1000); } catch { // Keep the previously cached severity when refresh reads fail. } try { var dVtq = _mxAccessClient.ReadAsync(alarmInfo.DescAttrNameTagReference).GetAwaiter().GetResult(); if (dVtq.Value is string desc && !string.IsNullOrEmpty(desc)) message = desc; } catch { // Keep the previously cached message when refresh reads fail. } } pendingAlarmEvents.Add((address, alarmInfo, newInAlarm, severity, message)); } // Apply under Lock so ClearChangeMasks propagates to monitored items. if (updates.Count > 0 || pendingAlarmEvents.Count > 0 || pendingAckedEvents.Count > 0) { lock (Lock) { foreach (var (address, variable, dataValue) in updates) { if (!_tagToVariableNode.TryGetValue(address, out var currentVariable) || !ReferenceEquals(currentVariable, variable)) continue; variable.Value = dataValue.Value; variable.StatusCode = dataValue.StatusCode; variable.Timestamp = dataValue.SourceTimestamp; variable.ClearChangeMasks(SystemContext, false); } foreach (var (address, info, active, severity, message) in pendingAlarmEvents) { if (!_alarmInAlarmTags.TryGetValue(address, out var currentInfo) || !ReferenceEquals(currentInfo, info)) continue; if (currentInfo.LastInAlarm == active) continue; currentInfo.LastInAlarm = active; if (severity.HasValue) currentInfo.CachedSeverity = severity.Value; if (!string.IsNullOrEmpty(message)) currentInfo.CachedMessage = message!; try { ReportAlarmEvent(currentInfo, active); } catch (Exception ex) { Log.Warning(ex, "Error reporting alarm event for {Source}", currentInfo.SourceName); } } // Apply Acked state changes foreach (var (info, acked) in pendingAckedEvents) { var condition = info.ConditionNode; if (condition == null) continue; try { condition.SetAcknowledgedState(SystemContext, acked); condition.Retain.Value = (condition.ActiveState?.Id?.Value == true) || !acked; if (_tagToVariableNode.TryGetValue(info.SourceTagReference, out var src) && src.Parent != null) src.Parent.ReportEvent(SystemContext, condition); Server.ReportEvent(SystemContext, condition); Log.Information("Alarm {AckState}: {Source}", acked ? "ACKNOWLEDGED" : "UNACKNOWLEDGED", info.SourceName); } catch (Exception ex) { Log.Warning(ex, "Error updating acked state for {Source}", info.SourceName); } } } } Interlocked.Add(ref _totalDispatchBatchSize, updates.Count); Interlocked.Increment(ref _dispatchCycleCount); ReportDispatchMetricsIfDue(); } catch (Exception ex) { Log.Error(ex, "Unhandled error in data change dispatch loop"); } } Log.Information("Data change dispatch thread stopped"); } private void ReportDispatchMetricsIfDue() { var now = DateTime.UtcNow; var elapsed = (now - _lastMetricsReportTime).TotalSeconds; if (elapsed < 60) return; var totalEvents = Interlocked.Read(ref _totalMxChangeEvents); var lastReported = Interlocked.Read(ref _lastReportedMxChangeEvents); var eventsPerSecond = (totalEvents - lastReported) / elapsed; Interlocked.Exchange(ref _lastReportedMxChangeEvents, totalEvents); var batchSize = Interlocked.Read(ref _totalDispatchBatchSize); var cycles = Interlocked.Read(ref _dispatchCycleCount); var avgQueueSize = cycles > 0 ? (double)batchSize / cycles : 0; // Reset rolling counters Interlocked.Exchange(ref _totalDispatchBatchSize, 0); Interlocked.Exchange(ref _dispatchCycleCount, 0); _lastMetricsReportTime = now; _lastEventsPerSecond = eventsPerSecond; _lastAvgBatchSize = avgQueueSize; Log.Information( "DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}", eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents); } /// protected override void Dispose(bool disposing) { if (disposing) { _dispatchDisposed = true; _mxAccessClient.OnTagValueChanged -= OnMxAccessDataChange; StopDispatchThread(); _dataChangeSignal.Dispose(); } base.Dispose(disposing); } #endregion } }