Add security classification, alarm detection, historical data access, and primitive grouping

Wire Galaxy security_classification to OPC UA AccessLevel (ReadOnly for SecuredWrite/VerifiedWrite/ViewOnly).
Use deployed package chain for attribute queries to exclude undeployed attributes.
Group primitive attributes under their parent variable node (merged Variable+Object).
Add is_historized and is_alarm detection via HistoryExtension/AlarmExtension primitives.
Implement OPC UA HistoryRead backed by Wonderware Historian Runtime database.
Implement AlarmConditionState nodes driven by InAlarm with condition refresh support.
Add historyread and alarms CLI commands for testing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-26 11:32:33 -04:00
parent bb0a89b2a1
commit 415e62c585
30 changed files with 2734 additions and 217 deletions

View File

@@ -24,5 +24,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
/// Gets or sets the embedded dashboard settings used to surface service health to operators.
/// </summary>
public DashboardConfiguration Dashboard { get; set; } = new DashboardConfiguration();
/// <summary>
/// Gets or sets the Wonderware Historian connection settings used to serve OPC UA historical data.
/// </summary>
public HistorianConfiguration Historian { get; set; } = new HistorianConfiguration();
}
}

View File

@@ -0,0 +1,23 @@
namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
{
/// <summary>
/// Wonderware Historian database configuration for OPC UA historical data access.
/// </summary>
public class HistorianConfiguration
{
/// <summary>
/// Gets or sets the connection string for the Wonderware Historian Runtime database.
/// </summary>
public string ConnectionString { get; set; } = "Server=localhost;Database=Runtime;Integrated Security=true;";
/// <summary>
/// Gets or sets the SQL command timeout in seconds for historian queries.
/// </summary>
public int CommandTimeoutSeconds { get; set; } = 30;
/// <summary>
/// Gets or sets the maximum number of values returned per HistoryRead request.
/// </summary>
public int MaxValuesPerRead { get; set; } = 10000;
}
}

View File

@@ -54,5 +54,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Domain
/// Gets or sets the source classification that explains whether the attribute comes from configuration, calculation, or runtime data.
/// </summary>
public string AttributeSource { get; set; } = "";
/// <summary>
/// Gets or sets the Galaxy security classification that determines OPC UA write access.
/// 0=FreeAccess, 1=Operate (default), 2=SecuredWrite, 3=VerifiedWrite, 4=Tune, 5=Configure, 6=ViewOnly.
/// </summary>
public int SecurityClassification { get; set; } = 1;
/// <summary>
/// Gets or sets a value indicating whether the attribute has a HistoryExtension primitive and is historized by the Wonderware Historian.
/// </summary>
public bool IsHistorized { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the attribute has an AlarmExtension primitive and is an alarm.
/// </summary>
public bool IsAlarm { get; set; }
}
}

View File

@@ -0,0 +1,28 @@
namespace ZB.MOM.WW.LmxOpcUa.Host.Domain
{
/// <summary>
/// Maps Galaxy security classification values to OPC UA write access decisions.
/// See gr/data_type_mapping.md for the full mapping table.
/// </summary>
public static class SecurityClassificationMapper
{
/// <summary>
/// Determines whether an attribute with the given security classification should allow writes.
/// </summary>
/// <param name="securityClassification">The Galaxy security classification value.</param>
/// <returns><see langword="true"/> for FreeAccess (0), Operate (1), Tune (4), Configure (5);
/// <see langword="false"/> for SecuredWrite (2), VerifiedWrite (3), ViewOnly (6).</returns>
public static bool IsWritable(int securityClassification)
{
switch (securityClassification)
{
case 2: // SecuredWrite
case 3: // VerifiedWrite
case 6: // ViewOnly
return false;
default:
return true;
}
}
}
}

View File

@@ -51,62 +51,132 @@ WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
ORDER BY parent_gobject_id, g.tag_name";
private const string AttributesSql = @"
;WITH template_chain AS (
SELECT g.gobject_id, g.derived_from_gobject_id, 0 AS depth
;WITH deployed_package_chain AS (
SELECT g.gobject_id, p.package_id, p.derived_from_package_id, 0 AS depth
FROM gobject g
WHERE g.is_template = 0
INNER JOIN package p ON p.package_id = g.deployed_package_id
WHERE g.is_template = 0 AND g.deployed_package_id <> 0
UNION ALL
SELECT tc.gobject_id, t.derived_from_gobject_id, tc.depth + 1
FROM template_chain tc
INNER JOIN gobject t ON t.gobject_id = tc.derived_from_gobject_id
WHERE tc.derived_from_gobject_id <> 0 AND tc.depth < 10
SELECT dpc.gobject_id, p.package_id, p.derived_from_package_id, dpc.depth + 1
FROM deployed_package_chain dpc
INNER JOIN package p ON p.package_id = dpc.derived_from_package_id
WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10
)
SELECT DISTINCT
g.gobject_id,
g.tag_name,
da.attribute_name,
g.tag_name + '.' + da.attribute_name
+ CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END
AS full_tag_reference,
da.mx_data_type,
dt.description AS data_type_name,
da.is_array,
CASE WHEN da.is_array = 1
THEN CONVERT(int, CONVERT(varbinary(2),
SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2))
ELSE NULL
END AS array_dimension,
da.mx_attribute_category,
da.security_classification
FROM template_chain tc
INNER JOIN dynamic_attribute da
ON da.gobject_id = tc.derived_from_gobject_id
INNER JOIN gobject g
ON g.gobject_id = tc.gobject_id
INNER JOIN template_definition td
ON td.template_definition_id = g.template_definition_id
LEFT JOIN data_type dt
ON dt.mx_data_type = da.mx_data_type
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND g.is_template = 0
AND g.deployed_package_id <> 0
AND da.attribute_name NOT LIKE '[_]%'
AND da.attribute_name NOT LIKE '%.Description'
AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24)
ORDER BY g.tag_name, da.attribute_name";
SELECT gobject_id, tag_name, attribute_name, full_tag_reference,
mx_data_type, data_type_name, is_array, array_dimension,
mx_attribute_category, security_classification, is_historized, is_alarm
FROM (
SELECT
dpc.gobject_id,
g.tag_name,
da.attribute_name,
g.tag_name + '.' + da.attribute_name
+ CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END
AS full_tag_reference,
da.mx_data_type,
dt.description AS data_type_name,
da.is_array,
CASE WHEN da.is_array = 1
THEN CONVERT(int, CONVERT(varbinary(2),
SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2))
ELSE NULL
END AS array_dimension,
da.mx_attribute_category,
da.security_classification,
CASE WHEN EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'HistoryExtension'
WHERE dpc2.gobject_id = dpc.gobject_id
) THEN 1 ELSE 0 END AS is_historized,
CASE WHEN EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'AlarmExtension'
WHERE dpc2.gobject_id = dpc.gobject_id
) THEN 1 ELSE 0 END AS is_alarm,
ROW_NUMBER() OVER (
PARTITION BY dpc.gobject_id, da.attribute_name
ORDER BY dpc.depth
) AS rn
FROM deployed_package_chain dpc
INNER JOIN dynamic_attribute da
ON da.package_id = dpc.package_id
INNER JOIN gobject g
ON g.gobject_id = dpc.gobject_id
INNER JOIN template_definition td
ON td.template_definition_id = g.template_definition_id
LEFT JOIN data_type dt
ON dt.mx_data_type = da.mx_data_type
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND da.attribute_name NOT LIKE '[_]%'
AND da.attribute_name NOT LIKE '%.Description'
AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24)
) ranked
WHERE rn = 1
ORDER BY tag_name, attribute_name";
private const string ExtendedAttributesSql = @"
;WITH template_chain AS (
SELECT g.gobject_id, g.derived_from_gobject_id, 0 AS depth
;WITH deployed_package_chain AS (
SELECT g.gobject_id, p.package_id, p.derived_from_package_id, 0 AS depth
FROM gobject g
WHERE g.is_template = 0
INNER JOIN package p ON p.package_id = g.deployed_package_id
WHERE g.is_template = 0 AND g.deployed_package_id <> 0
UNION ALL
SELECT tc.gobject_id, t.derived_from_gobject_id, tc.depth + 1
FROM template_chain tc
INNER JOIN gobject t ON t.gobject_id = tc.derived_from_gobject_id
WHERE tc.derived_from_gobject_id <> 0 AND tc.depth < 10
SELECT dpc.gobject_id, p.package_id, p.derived_from_package_id, dpc.depth + 1
FROM deployed_package_chain dpc
INNER JOIN package p ON p.package_id = dpc.derived_from_package_id
WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10
),
ranked_dynamic AS (
SELECT
dpc.gobject_id,
g.tag_name,
da.attribute_name,
g.tag_name + '.' + da.attribute_name
+ CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END
AS full_tag_reference,
da.mx_data_type,
dt.description AS data_type_name,
da.is_array,
CASE WHEN da.is_array = 1
THEN CONVERT(int, CONVERT(varbinary(2),
SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2))
ELSE NULL
END AS array_dimension,
da.mx_attribute_category,
da.security_classification,
CASE WHEN EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'HistoryExtension'
WHERE dpc2.gobject_id = dpc.gobject_id
) THEN 1 ELSE 0 END AS is_historized,
CASE WHEN EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'AlarmExtension'
WHERE dpc2.gobject_id = dpc.gobject_id
) THEN 1 ELSE 0 END AS is_alarm,
ROW_NUMBER() OVER (
PARTITION BY dpc.gobject_id, da.attribute_name
ORDER BY dpc.depth
) AS rn
FROM deployed_package_chain dpc
INNER JOIN dynamic_attribute da
ON da.package_id = dpc.package_id
INNER JOIN gobject g
ON g.gobject_id = dpc.gobject_id
INNER JOIN template_definition td
ON td.template_definition_id = g.template_definition_id
LEFT JOIN data_type dt
ON dt.mx_data_type = da.mx_data_type
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND da.attribute_name NOT LIKE '[_]%'
AND da.attribute_name NOT LIKE '%.Description'
AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24)
)
SELECT DISTINCT
SELECT
gobject_id,
tag_name,
primitive_name,
@@ -118,6 +188,8 @@ SELECT DISTINCT
array_dimension,
mx_attribute_category,
security_classification,
is_historized,
is_alarm,
attribute_source
FROM (
SELECT
@@ -140,6 +212,8 @@ FROM (
END AS array_dimension,
ad.mx_attribute_category,
ad.security_classification,
CAST(0 AS int) AS is_historized,
CAST(0 AS int) AS is_alarm,
'primitive' AS attribute_source
FROM gobject g
INNER JOIN instance i
@@ -148,7 +222,7 @@ FROM (
ON td.template_definition_id = g.template_definition_id
AND td.runtime_clsid <> '{00000000-0000-0000-0000-000000000000}'
INNER JOIN package p
ON p.package_id = g.checked_in_package_id
ON p.package_id = g.deployed_package_id
INNER JOIN primitive_instance pi
ON pi.package_id = p.package_id
AND pi.property_bitmask & 0x10 <> 0x10
@@ -165,39 +239,22 @@ FROM (
UNION ALL
SELECT
g.gobject_id,
g.tag_name,
gobject_id,
tag_name,
'' AS primitive_name,
da.attribute_name,
g.tag_name + '.' + da.attribute_name
+ CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END
AS full_tag_reference,
da.mx_data_type,
dt.description AS data_type_name,
da.is_array,
CASE WHEN da.is_array = 1
THEN CONVERT(int, CONVERT(varbinary(2),
SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2))
ELSE NULL
END AS array_dimension,
da.mx_attribute_category,
da.security_classification,
attribute_name,
full_tag_reference,
mx_data_type,
data_type_name,
is_array,
array_dimension,
mx_attribute_category,
security_classification,
is_historized,
is_alarm,
'dynamic' AS attribute_source
FROM template_chain tc
INNER JOIN dynamic_attribute da
ON da.gobject_id = tc.derived_from_gobject_id
INNER JOIN gobject g
ON g.gobject_id = tc.gobject_id
INNER JOIN template_definition td
ON td.template_definition_id = g.template_definition_id
LEFT JOIN data_type dt
ON dt.mx_data_type = da.mx_data_type
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND g.is_template = 0
AND g.deployed_package_id <> 0
AND da.attribute_name NOT LIKE '[_]%'
AND da.attribute_name NOT LIKE '%.Description'
AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24)
FROM ranked_dynamic
WHERE rn = 1
) all_attributes
ORDER BY tag_name, primitive_name, attribute_name";
@@ -279,9 +336,10 @@ ORDER BY tag_name, primitive_name, attribute_name";
}
/// <summary>
/// Reads a row from the standard attributes query (10 columns).
/// Reads a row from the standard attributes query (12 columns).
/// Columns: gobject_id, tag_name, attribute_name, full_tag_reference, mx_data_type,
/// data_type_name, is_array, array_dimension, mx_attribute_category, security_classification
/// data_type_name, is_array, array_dimension, mx_attribute_category,
/// security_classification, is_historized, is_alarm
/// </summary>
private static GalaxyAttributeInfo ReadStandardAttribute(SqlDataReader reader)
{
@@ -294,15 +352,18 @@ ORDER BY tag_name, primitive_name, attribute_name";
MxDataType = Convert.ToInt32(reader.GetValue(4)),
DataTypeName = reader.IsDBNull(5) ? "" : reader.GetString(5),
IsArray = Convert.ToBoolean(reader.GetValue(6)),
ArrayDimension = reader.IsDBNull(7) ? null : (int?)Convert.ToInt32(reader.GetValue(7))
ArrayDimension = reader.IsDBNull(7) ? null : (int?)Convert.ToInt32(reader.GetValue(7)),
SecurityClassification = Convert.ToInt32(reader.GetValue(9)),
IsHistorized = Convert.ToInt32(reader.GetValue(10)) == 1,
IsAlarm = Convert.ToInt32(reader.GetValue(11)) == 1
};
}
/// <summary>
/// Reads a row from the extended attributes query (12 columns).
/// Reads a row from the extended attributes query (14 columns).
/// Columns: gobject_id, tag_name, primitive_name, attribute_name, full_tag_reference,
/// mx_data_type, data_type_name, is_array, array_dimension,
/// mx_attribute_category, security_classification, attribute_source
/// mx_attribute_category, security_classification, is_historized, is_alarm, attribute_source
/// </summary>
private static GalaxyAttributeInfo ReadExtendedAttribute(SqlDataReader reader)
{
@@ -317,7 +378,10 @@ ORDER BY tag_name, primitive_name, attribute_name";
DataTypeName = reader.IsDBNull(6) ? "" : reader.GetString(6),
IsArray = Convert.ToBoolean(reader.GetValue(7)),
ArrayDimension = reader.IsDBNull(8) ? null : (int?)Convert.ToInt32(reader.GetValue(8)),
AttributeSource = reader.IsDBNull(11) ? "" : reader.GetString(11)
SecurityClassification = Convert.ToInt32(reader.GetValue(10)),
IsHistorized = Convert.ToInt32(reader.GetValue(11)) == 1,
IsAlarm = Convert.ToInt32(reader.GetValue(12)) == 1,
AttributeSource = reader.IsDBNull(13) ? "" : reader.GetString(13)
};
}

View File

@@ -0,0 +1,156 @@
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Serilog;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
{
/// <summary>
/// Reads historical data from the Wonderware Historian Runtime database.
/// </summary>
public class HistorianDataSource
{
private static readonly ILogger Log = Serilog.Log.ForContext<HistorianDataSource>();
private readonly HistorianConfiguration _config;
public HistorianDataSource(HistorianConfiguration config)
{
_config = config;
}
/// <summary>
/// Reads raw historical values for a tag from the Historian.
/// </summary>
public async Task<List<DataValue>> ReadRawAsync(
string tagName, DateTime startTime, DateTime endTime, int maxValues,
CancellationToken ct = default)
{
var results = new List<DataValue>();
var sql = maxValues > 0
? "SELECT TOP (@MaxValues) DateTime, Value, vValue, Quality FROM Runtime.dbo.History WHERE TagName = @TagName AND DateTime >= @StartTime AND DateTime <= @EndTime ORDER BY DateTime"
: "SELECT DateTime, Value, vValue, Quality FROM Runtime.dbo.History WHERE TagName = @TagName AND DateTime >= @StartTime AND DateTime <= @EndTime ORDER BY DateTime";
using var conn = new SqlConnection(_config.ConnectionString);
await conn.OpenAsync(ct);
using var cmd = new SqlCommand(sql, conn) { CommandTimeout = _config.CommandTimeoutSeconds };
cmd.Parameters.AddWithValue("@TagName", tagName);
cmd.Parameters.AddWithValue("@StartTime", startTime);
cmd.Parameters.AddWithValue("@EndTime", endTime);
if (maxValues > 0)
cmd.Parameters.AddWithValue("@MaxValues", maxValues);
using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
var timestamp = reader.GetDateTime(0);
object? value;
if (!reader.IsDBNull(1))
value = reader.GetDouble(1);
else if (!reader.IsDBNull(2))
value = reader.GetString(2);
else
value = null;
var quality = reader.IsDBNull(3) ? (byte)0 : Convert.ToByte(reader.GetValue(3));
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = MapQuality(quality)
});
}
Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})",
tagName, results.Count, startTime, endTime);
return results;
}
/// <summary>
/// Reads aggregate historical values for a tag from the Historian.
/// </summary>
public async Task<List<DataValue>> ReadAggregateAsync(
string tagName, DateTime startTime, DateTime endTime,
double intervalMs, string aggregateColumn,
CancellationToken ct = default)
{
var results = new List<DataValue>();
var sql = $"SELECT StartDateTime, [{aggregateColumn}] FROM Runtime.dbo.AnalogSummaryHistory " +
"WHERE TagName = @TagName AND StartDateTime >= @StartTime AND StartDateTime <= @EndTime " +
"AND wwResolution = @Resolution ORDER BY StartDateTime";
using var conn = new SqlConnection(_config.ConnectionString);
await conn.OpenAsync(ct);
using var cmd = new SqlCommand(sql, conn) { CommandTimeout = _config.CommandTimeoutSeconds };
cmd.Parameters.AddWithValue("@TagName", tagName);
cmd.Parameters.AddWithValue("@StartTime", startTime);
cmd.Parameters.AddWithValue("@EndTime", endTime);
cmd.Parameters.AddWithValue("@Resolution", (int)intervalMs);
using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
var timestamp = reader.GetDateTime(0);
var value = reader.IsDBNull(1) ? (object?)null : reader.GetDouble(1);
results.Add(new DataValue
{
Value = new Variant(value),
SourceTimestamp = timestamp,
ServerTimestamp = timestamp,
StatusCode = value != null ? StatusCodes.Good : StatusCodes.BadNoData
});
}
Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values",
aggregateColumn, tagName, results.Count);
return results;
}
/// <summary>
/// Maps Wonderware Historian quality codes to OPC UA StatusCodes.
/// </summary>
public static StatusCode MapQuality(byte quality)
{
if (quality == 0)
return StatusCodes.Good;
if (quality == 1)
return StatusCodes.Bad;
if (quality >= 128)
return StatusCodes.Uncertain;
return StatusCodes.Bad;
}
/// <summary>
/// Maps an OPC UA aggregate NodeId to the corresponding Historian column name.
/// Returns null if the aggregate is not supported.
/// </summary>
public static string? MapAggregateToColumn(NodeId aggregateId)
{
if (aggregateId == ObjectIds.AggregateFunction_Average)
return "Average";
if (aggregateId == ObjectIds.AggregateFunction_Minimum)
return "Minimum";
if (aggregateId == ObjectIds.AggregateFunction_Maximum)
return "Maximum";
if (aggregateId == ObjectIds.AggregateFunction_Count)
return "ValueCount";
if (aggregateId == ObjectIds.AggregateFunction_Start)
return "First";
if (aggregateId == ObjectIds.AggregateFunction_End)
return "Last";
if (aggregateId == ObjectIds.AggregateFunction_StandardDeviationPopulation)
return "StdDev";
return null;
}
}
}

View File

@@ -84,6 +84,27 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
/// Gets or sets the declared array length when the attribute is a fixed-size array.
/// </summary>
public int? ArrayDimension { get; set; }
/// <summary>
/// Gets or sets the primitive name that groups the attribute under a sub-object node.
/// Empty for root-level attributes.
/// </summary>
public string PrimitiveName { get; set; } = "";
/// <summary>
/// Gets or sets the Galaxy security classification that determines OPC UA write access.
/// </summary>
public int SecurityClassification { get; set; } = 1;
/// <summary>
/// Gets or sets a value indicating whether the attribute is historized.
/// </summary>
public bool IsHistorized { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the attribute is an alarm.
/// </summary>
public bool IsAlarm { get; set; }
}
/// <summary>
@@ -175,7 +196,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
FullTagReference = attr.FullTagReference,
MxDataType = attr.MxDataType,
IsArray = attr.IsArray,
ArrayDimension = attr.ArrayDimension
ArrayDimension = attr.ArrayDimension,
PrimitiveName = attr.PrimitiveName ?? "",
SecurityClassification = attr.SecurityClassification,
IsHistorized = attr.IsHistorized,
IsAlarm = attr.IsAlarm
});
model.NodeIdToTagReference[GetNodeIdentifier(attr)] = attr.FullTagReference;

View File

@@ -1,10 +1,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Opc.Ua;
using Opc.Ua.Server;
using Serilog;
using ZB.MOM.WW.LmxOpcUa.Host.Domain;
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
using ZB.MOM.WW.LmxOpcUa.Host.Metrics;
namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
@@ -19,6 +22,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly IMxAccessClient _mxAccessClient;
private readonly PerformanceMetrics _metrics;
private readonly HistorianDataSource? _historianDataSource;
private readonly string _namespaceUri;
// NodeId → full_tag_reference for read/write resolution
@@ -32,6 +36,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly object _lock = new object();
private IDictionary<NodeId, IList<IReference>>? _externalReferences;
// Data change dispatch queue: decouples MXAccess STA callbacks from OPC UA framework Lock
private readonly ConcurrentDictionary<string, Vtq> _pendingDataChanges = new ConcurrentDictionary<string, Vtq>(StringComparer.OrdinalIgnoreCase);
private readonly AutoResetEvent _dataChangeSignal = new AutoResetEvent(false);
private Thread? _dispatchThread;
private volatile bool _dispatchRunning;
// Dispatch queue metrics
private long _totalMxChangeEvents;
private long _lastReportedMxChangeEvents;
private long _totalDispatchBatchSize;
private long _dispatchCycleCount;
private DateTime _lastMetricsReportTime = DateTime.UtcNow;
private double _lastEventsPerSecond;
private double _lastAvgBatchSize;
private sealed class TagMetadata
{
public int MxDataType { get; set; }
@@ -39,6 +58,22 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
public int? ArrayDimension { get; set; }
}
// Alarm tracking: maps InAlarm tag reference → alarm source info
private readonly Dictionary<string, AlarmInfo> _alarmInAlarmTags = new Dictionary<string, AlarmInfo>(StringComparer.OrdinalIgnoreCase);
private sealed class AlarmInfo
{
public string SourceTagReference { get; set; } = "";
public NodeId SourceNodeId { get; set; } = NodeId.Null;
public string SourceName { get; set; } = "";
public bool LastInAlarm { get; set; }
public AlarmConditionState? ConditionNode { get; set; }
public string PriorityTagReference { get; set; } = "";
public string DescAttrNameTagReference { get; set; } = "";
public ushort CachedSeverity { get; set; }
public string CachedMessage { get; set; } = "";
}
/// <summary>
/// Gets the mapping from OPC UA node identifiers to the Galaxy tag references used for runtime I/O.
/// </summary>
@@ -54,6 +89,26 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
/// </summary>
public int ObjectNodeCount { get; private set; }
/// <summary>
/// Gets the total number of MXAccess data change events received since startup.
/// </summary>
public long TotalMxChangeEvents => Interlocked.Read(ref _totalMxChangeEvents);
/// <summary>
/// Gets the number of items currently waiting in the dispatch queue.
/// </summary>
public int PendingDataChangeCount => _pendingDataChanges.Count;
/// <summary>
/// Gets the most recently computed MXAccess data change events per second.
/// </summary>
public double MxChangeEventsPerSecond => _lastEventsPerSecond;
/// <summary>
/// Gets the most recently computed average dispatch batch size (proxy for queue depth under load).
/// </summary>
public double AverageDispatchBatchSize => _lastAvgBatchSize;
/// <summary>
/// Initializes a new node manager for the Galaxy-backed OPC UA namespace.
/// </summary>
@@ -67,15 +122,20 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
ApplicationConfiguration configuration,
string namespaceUri,
IMxAccessClient mxAccessClient,
PerformanceMetrics metrics)
PerformanceMetrics metrics,
HistorianDataSource? historianDataSource = null)
: base(server, configuration, namespaceUri)
{
_namespaceUri = namespaceUri;
_mxAccessClient = mxAccessClient;
_metrics = metrics;
_historianDataSource = historianDataSource;
// Wire up data change delivery
_mxAccessClient.OnTagValueChanged += OnMxAccessDataChange;
// Start background dispatch thread
StartDispatchThread();
}
/// <inheritdoc />
@@ -100,6 +160,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
_nodeIdToTagReference.Clear();
_tagToVariableNode.Clear();
_tagMetadata.Clear();
_alarmInAlarmTags.Clear();
VariableNodeCount = 0;
ObjectNodeCount = 0;
@@ -111,9 +172,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
.GroupBy(a => a.GobjectId)
.ToDictionary(g => g.Key, g => g.ToList());
// Root folder
// Root folder — enable events so alarm events propagate to clients subscribed at root
var rootFolder = CreateFolder(null, "ZB", "ZB");
rootFolder.NodeId = new NodeId("ZB", NamespaceIndex);
rootFolder.EventNotifier = EventNotifiers.SubscribeToEvents;
rootFolder.AddReference(ReferenceTypeIds.Organizes, true, ObjectIds.ObjectsFolder);
AddPredefinedNode(SystemContext, rootFolder);
@@ -161,18 +223,201 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
// Create variable nodes for this object's attributes
if (attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs))
{
foreach (var attr in objAttrs)
// Group by primitive_name: empty = direct child, non-empty = sub-object
var byPrimitive = objAttrs
.GroupBy(a => a.PrimitiveName ?? "")
.OrderBy(g => g.Key);
// Collect primitive group names so we know which direct attributes have children
var primitiveGroupNames = new HashSet<string>(
byPrimitive.Select(g => g.Key).Where(k => !string.IsNullOrEmpty(k)),
StringComparer.OrdinalIgnoreCase);
// Track variable nodes created for direct attributes that also have primitive children
var variableNodes = new Dictionary<string, BaseDataVariableState>(StringComparer.OrdinalIgnoreCase);
// First pass: create direct (root-level) attribute variables
var directGroup = byPrimitive.FirstOrDefault(g => string.IsNullOrEmpty(g.Key));
if (directGroup != null)
{
CreateAttributeVariable(node, attr);
foreach (var attr in directGroup)
{
var variable = CreateAttributeVariable(node, attr);
if (primitiveGroupNames.Contains(attr.AttributeName))
{
variableNodes[attr.AttributeName] = variable;
}
}
}
// Second pass: add primitive child attributes under the matching variable node
foreach (var group in byPrimitive)
{
if (string.IsNullOrEmpty(group.Key))
continue;
NodeState parentForAttrs;
if (variableNodes.TryGetValue(group.Key, out var existingVariable))
{
// Merge: use the existing variable node as parent
parentForAttrs = existingVariable;
}
else
{
// No matching dynamic attribute — create an object node
var primNode = CreateObject(node, group.Key, group.Key);
primNode.NodeId = new NodeId(obj.TagName + "." + group.Key, NamespaceIndex);
AddPredefinedNode(SystemContext, primNode);
parentForAttrs = primNode;
}
foreach (var attr in group)
{
CreateAttributeVariable(parentForAttrs, attr);
}
}
}
}
Log.Information("Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references",
ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count);
// Build alarm tracking: create AlarmConditionState for each alarm attribute
foreach (var obj in sorted)
{
if (obj.IsArea) continue;
if (!attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) continue;
var hasAlarms = false;
var alarmAttrs = objAttrs.Where(a => a.IsAlarm && string.IsNullOrEmpty(a.PrimitiveName)).ToList();
foreach (var alarmAttr in alarmAttrs)
{
var inAlarmTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']') + ".InAlarm";
if (!_tagToVariableNode.ContainsKey(inAlarmTagRef))
continue;
var alarmNodeIdStr = alarmAttr.FullTagReference.EndsWith("[]")
? alarmAttr.FullTagReference.Substring(0, alarmAttr.FullTagReference.Length - 2)
: alarmAttr.FullTagReference;
// Find the source variable node for the alarm
_tagToVariableNode.TryGetValue(alarmAttr.FullTagReference, out var sourceVariable);
var sourceNodeId = new NodeId(alarmNodeIdStr, NamespaceIndex);
// Create AlarmConditionState attached to the source variable
var conditionNodeId = new NodeId(alarmNodeIdStr + ".Condition", NamespaceIndex);
var condition = new AlarmConditionState(sourceVariable);
condition.Create(SystemContext, conditionNodeId,
new QualifiedName(alarmAttr.AttributeName + "Alarm", NamespaceIndex),
new LocalizedText("en", alarmAttr.AttributeName + " Alarm"),
true);
condition.SourceNode.Value = sourceNodeId;
condition.SourceName.Value = alarmAttr.AttributeName;
condition.ConditionName.Value = alarmAttr.AttributeName;
condition.AutoReportStateChanges = true;
// Set initial state: enabled, inactive, acknowledged
condition.SetEnableState(SystemContext, true);
condition.SetActiveState(SystemContext, false);
condition.SetAcknowledgedState(SystemContext, true);
condition.SetSeverity(SystemContext, EventSeverity.Medium);
condition.Retain.Value = false;
condition.OnReportEvent = (context, node, e) => Server.ReportEvent(context, e);
// Add HasCondition reference from source to condition
if (sourceVariable != null)
{
sourceVariable.AddReference(ReferenceTypeIds.HasCondition, false, conditionNodeId);
condition.AddReference(ReferenceTypeIds.HasCondition, true, sourceNodeId);
}
AddPredefinedNode(SystemContext, condition);
var baseTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']');
_alarmInAlarmTags[inAlarmTagRef] = new AlarmInfo
{
SourceTagReference = alarmAttr.FullTagReference,
SourceNodeId = sourceNodeId,
SourceName = alarmAttr.AttributeName,
ConditionNode = condition,
PriorityTagReference = baseTagRef + ".Priority",
DescAttrNameTagReference = baseTagRef + ".DescAttrName"
};
hasAlarms = true;
}
// Enable EventNotifier on object nodes that contain alarms
if (hasAlarms && nodeMap.TryGetValue(obj.GobjectId, out var objNode))
{
if (objNode is BaseObjectState objState)
objState.EventNotifier = EventNotifiers.SubscribeToEvents;
else if (objNode is FolderState folderState)
folderState.EventNotifier = EventNotifiers.SubscribeToEvents;
}
}
// Auto-subscribe to InAlarm tags so we detect alarm transitions
SubscribeAlarmTags();
Log.Information("Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references, {Alarms} alarm tags",
ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count, _alarmInAlarmTags.Count);
}
}
private void SubscribeAlarmTags()
{
foreach (var kvp in _alarmInAlarmTags)
{
// Subscribe to InAlarm, Priority, and DescAttrName for each alarm
var tagsToSubscribe = new[] { kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference };
foreach (var tag in tagsToSubscribe)
{
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
continue;
try
{
_mxAccessClient.SubscribeAsync(tag, (_, _) => { });
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to auto-subscribe to alarm tag {Tag}", tag);
}
}
}
}
private void ReportAlarmEvent(AlarmInfo info, bool active)
{
var condition = info.ConditionNode;
if (condition == null)
return;
ushort severity = info.CachedSeverity;
string message = active
? (!string.IsNullOrEmpty(info.CachedMessage) ? info.CachedMessage : $"Alarm active: {info.SourceName}")
: $"Alarm cleared: {info.SourceName}";
condition.SetActiveState(SystemContext, active);
condition.Message.Value = new LocalizedText("en", message);
condition.SetSeverity(SystemContext, (EventSeverity)severity);
// Retain while active or unacknowledged
condition.Retain.Value = active || (condition.AckedState?.Id?.Value == false);
// Reset acknowledged state when alarm activates
if (active)
condition.SetAcknowledgedState(SystemContext, false);
// Report through the source node hierarchy so events reach subscribers on parent objects
if (_tagToVariableNode.TryGetValue(info.SourceTagReference, out var sourceVar) && sourceVar.Parent != null)
{
sourceVar.Parent.ReportEvent(SystemContext, condition);
}
// Also report to Server node for clients subscribed at server level
Server.ReportEvent(SystemContext, condition);
Log.Information("Alarm {State}: {Source} (Severity={Severity}, Message={Message})",
active ? "ACTIVE" : "CLEARED", info.SourceName, severity, message);
}
/// <summary>
/// Rebuilds the address space, removing old nodes and creating new ones. (OPC-010)
/// </summary>
@@ -197,6 +442,19 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
}
// Unsubscribe auto-subscribed alarm tags
foreach (var kvp in _alarmInAlarmTags)
{
foreach (var tag in new[] { kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference })
{
if (!string.IsNullOrEmpty(tag))
{
try { _mxAccessClient.UnsubscribeAsync(tag).GetAwaiter().GetResult(); }
catch { /* ignore */ }
}
}
}
// Remove all predefined nodes
foreach (var nodeId in PredefinedNodes.Keys.ToList())
{
@@ -260,7 +518,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
return result;
}
private void CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr)
private BaseDataVariableState CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr)
{
var opcUaDataTypeId = MxDataTypeMapper.MapToOpcUaDataType(attr.MxDataType);
var variable = CreateVariable(parent, attr.AttributeName, attr.AttributeName, new NodeId(opcUaDataTypeId),
@@ -274,8 +532,16 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
variable.ArrayDimensions = new ReadOnlyList<uint>(new List<uint> { (uint)attr.ArrayDimension.Value });
}
variable.AccessLevel = AccessLevels.CurrentReadOrWrite;
variable.UserAccessLevel = AccessLevels.CurrentReadOrWrite;
var accessLevel = SecurityClassificationMapper.IsWritable(attr.SecurityClassification)
? AccessLevels.CurrentReadOrWrite
: AccessLevels.CurrentRead;
if (attr.IsHistorized)
{
accessLevel |= AccessLevels.HistoryRead;
}
variable.AccessLevel = accessLevel;
variable.UserAccessLevel = accessLevel;
variable.Historizing = attr.IsHistorized;
variable.Value = NormalizePublishedValue(attr.FullTagReference, null);
variable.StatusCode = StatusCodes.BadWaitingForInitialData;
variable.Timestamp = DateTime.UtcNow;
@@ -290,6 +556,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
ArrayDimension = attr.ArrayDimension
};
VariableNodeCount++;
return variable;
}
private static string GetNodeIdentifier(GalaxyAttributeInfo attr)
@@ -412,6 +679,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
if (nodesToWrite[i].AttributeId != Attributes.Value)
continue;
// Skip if base rejected due to access level (read-only node)
if (errors[i] != null && errors[i].StatusCode == StatusCodes.BadNotWritable)
continue;
var nodeId = nodesToWrite[i].NodeId;
if (nodeId.NamespaceIndex != NamespaceIndex) continue;
@@ -551,21 +822,167 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
#endregion
#region Condition Refresh
/// <summary>
/// Reports all active retained alarm conditions during a condition refresh.
/// </summary>
public override ServiceResult ConditionRefresh(OperationContext context, IList<IEventMonitoredItem> monitoredItems)
{
foreach (var kvp in _alarmInAlarmTags)
{
var info = kvp.Value;
if (info.ConditionNode == null || info.ConditionNode.Retain?.Value != true)
continue;
foreach (var item in monitoredItems)
{
item.QueueEvent(info.ConditionNode);
}
}
return ServiceResult.Good;
}
#endregion
#region HistoryRead
/// <inheritdoc />
protected override void HistoryReadRawModified(
ServerSystemContext context,
ReadRawModifiedDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown);
continue;
}
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
try
{
var maxValues = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
var dataValues = _historianDataSource.ReadRawAsync(
tagRef, details.StartTime, details.EndTime, maxValues)
.GetAwaiter().GetResult();
var historyData = new HistoryData();
historyData.DataValues.AddRange(dataValues);
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyData)
};
errors[idx] = ServiceResult.Good;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead raw failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
/// <inheritdoc />
protected override void HistoryReadProcessed(
ServerSystemContext context,
ReadProcessedDetails details,
TimestampsToReturn timestampsToReturn,
IList<HistoryReadValueId> nodesToRead,
IList<HistoryReadResult> results,
IList<ServiceResult> errors,
List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
foreach (var handle in nodesToProcess)
{
var idx = handle.Index;
var nodeIdStr = handle.NodeId?.Identifier as string;
if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown);
continue;
}
if (_historianDataSource == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported);
continue;
}
if (details.AggregateType == null || details.AggregateType.Count == 0)
{
errors[idx] = new ServiceResult(StatusCodes.BadAggregateListMismatch);
continue;
}
var aggregateId = details.AggregateType[idx < details.AggregateType.Count ? idx : 0];
var column = HistorianDataSource.MapAggregateToColumn(aggregateId);
if (column == null)
{
errors[idx] = new ServiceResult(StatusCodes.BadAggregateNotSupported);
continue;
}
try
{
var dataValues = _historianDataSource.ReadAggregateAsync(
tagRef, details.StartTime, details.EndTime,
details.ProcessingInterval, column)
.GetAwaiter().GetResult();
var historyData = new HistoryData();
historyData.DataValues.AddRange(dataValues);
results[idx] = new HistoryReadResult
{
StatusCode = StatusCodes.Good,
HistoryData = new ExtensionObject(historyData)
};
errors[idx] = ServiceResult.Good;
}
catch (Exception ex)
{
Log.Warning(ex, "HistoryRead processed failed for {TagRef}", tagRef);
errors[idx] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
#endregion
#region Subscription Delivery
/// <summary>
/// Called by the OPC UA framework after monitored items are created on nodes in our namespace.
/// Triggers ref-counted MXAccess subscriptions for the underlying tags.
/// Called by the OPC UA framework during monitored item creation.
/// Triggers ref-counted MXAccess subscriptions early so the runtime value
/// can arrive before the initial publish to the client.
/// </summary>
/// <inheritdoc />
protected override void OnCreateMonitoredItemsComplete(ServerSystemContext context, IList<IMonitoredItem> monitoredItems)
protected override void OnMonitoredItemCreated(ServerSystemContext context, NodeHandle handle, MonitoredItem monitoredItem)
{
foreach (var item in monitoredItems)
{
var nodeIdStr = GetNodeIdString(item);
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
SubscribeTag(tagRef);
}
base.OnMonitoredItemCreated(context, handle, monitoredItem);
var nodeIdStr = handle?.NodeId?.Identifier as string;
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
SubscribeTag(tagRef);
}
/// <summary>
@@ -583,6 +1000,24 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
}
/// <summary>
/// Called by the OPC UA framework after monitored items are transferred to a new session.
/// Rebuilds MXAccess subscription bookkeeping when transferred items arrive without local in-memory state.
/// </summary>
/// <inheritdoc />
protected override void OnMonitoredItemsTransferred(ServerSystemContext context, IList<IMonitoredItem> monitoredItems)
{
base.OnMonitoredItemsTransferred(context, monitoredItems);
var transferredTagRefs = monitoredItems
.Select(GetNodeIdString)
.Where(nodeIdStr => nodeIdStr != null && _nodeIdToTagReference.ContainsKey(nodeIdStr))
.Select(nodeIdStr => _nodeIdToTagReference[nodeIdStr!])
.ToList();
RestoreTransferredSubscriptions(transferredTagRefs);
}
private static string? GetNodeIdString(IMonitoredItem item)
{
if (item.ManagerHandle is NodeState node)
@@ -633,23 +1068,206 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
}
/// <summary>
/// Rebuilds subscription reference counts for monitored items that were transferred by the OPC UA stack.
/// Existing in-memory bookkeeping is preserved to avoid double-counting normal in-process transfers.
/// </summary>
/// <param name="fullTagReferences">The Galaxy tag references represented by the transferred monitored items.</param>
internal void RestoreTransferredSubscriptions(IEnumerable<string> fullTagReferences)
{
var transferredCounts = fullTagReferences
.GroupBy(tagRef => tagRef, StringComparer.OrdinalIgnoreCase)
.ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase);
foreach (var kvp in transferredCounts)
{
lock (_lock)
{
if (_subscriptionRefCounts.ContainsKey(kvp.Key))
continue;
_subscriptionRefCounts[kvp.Key] = kvp.Value;
}
_ = _mxAccessClient.SubscribeAsync(kvp.Key, (_, _) => { });
}
}
private void OnMxAccessDataChange(string address, Vtq vtq)
{
if (_tagToVariableNode.TryGetValue(address, out var variable))
Interlocked.Increment(ref _totalMxChangeEvents);
_pendingDataChanges[address] = vtq;
_dataChangeSignal.Set();
}
#endregion
#region Data Change Dispatch
private void StartDispatchThread()
{
_dispatchRunning = true;
_dispatchThread = new Thread(DispatchLoop)
{
try
Name = "OpcUaDataChangeDispatch",
IsBackground = true
};
_dispatchThread.Start();
}
private void StopDispatchThread()
{
_dispatchRunning = false;
_dataChangeSignal.Set();
_dispatchThread?.Join(TimeSpan.FromSeconds(5));
}
private void DispatchLoop()
{
Log.Information("Data change dispatch thread started");
while (_dispatchRunning)
{
_dataChangeSignal.WaitOne(TimeSpan.FromMilliseconds(100));
if (!_dispatchRunning) break;
var keys = _pendingDataChanges.Keys.ToList();
if (keys.Count == 0)
{
var dataValue = CreatePublishedDataValue(address, vtq);
variable.Value = dataValue.Value;
variable.StatusCode = dataValue.StatusCode;
variable.Timestamp = dataValue.SourceTimestamp;
variable.ClearChangeMasks(SystemContext, false);
ReportDispatchMetricsIfDue();
continue;
}
catch (Exception ex)
// Prepare updates outside the Lock — no IO, just value conversion
var updates = new List<(BaseDataVariableState variable, DataValue dataValue)>(keys.Count);
var pendingAlarmEvents = new List<(AlarmInfo info, bool active)>();
foreach (var address in keys)
{
Log.Warning(ex, "Error updating variable node for {Address}", address);
if (_pendingDataChanges.TryRemove(address, out var vtq))
{
if (_tagToVariableNode.TryGetValue(address, out var variable))
{
try
{
var dataValue = CreatePublishedDataValue(address, vtq);
updates.Add((variable, dataValue));
}
catch (Exception ex)
{
Log.Warning(ex, "Error preparing data change for {Address}", address);
}
}
// Check for alarm InAlarm transitions
if (_alarmInAlarmTags.TryGetValue(address, out var alarmInfo))
{
var newInAlarm = vtq.Value is true || vtq.Value is 1 || (vtq.Value is int intVal && intVal != 0);
if (newInAlarm != alarmInfo.LastInAlarm)
{
alarmInfo.LastInAlarm = newInAlarm;
// Read Priority and DescAttrName via MXAccess (outside Lock, safe here)
if (newInAlarm)
{
try
{
var pVtq = _mxAccessClient.ReadAsync(alarmInfo.PriorityTagReference).GetAwaiter().GetResult();
if (pVtq.Value is int ip) alarmInfo.CachedSeverity = (ushort)System.Math.Min(System.Math.Max(ip, 1), 1000);
else if (pVtq.Value is short sp) alarmInfo.CachedSeverity = (ushort)System.Math.Min(System.Math.Max((int)sp, 1), 1000);
}
catch { /* keep previous */ }
try
{
var dVtq = _mxAccessClient.ReadAsync(alarmInfo.DescAttrNameTagReference).GetAwaiter().GetResult();
if (dVtq.Value is string desc && !string.IsNullOrEmpty(desc))
alarmInfo.CachedMessage = desc;
}
catch { /* keep previous */ }
}
pendingAlarmEvents.Add((alarmInfo, newInAlarm));
}
}
}
}
// Apply under Lock so ClearChangeMasks propagates to monitored items
if (updates.Count > 0 || pendingAlarmEvents.Count > 0)
{
lock (Lock)
{
foreach (var (variable, dataValue) in updates)
{
variable.Value = dataValue.Value;
variable.StatusCode = dataValue.StatusCode;
variable.Timestamp = dataValue.SourceTimestamp;
variable.ClearChangeMasks(SystemContext, false);
}
// Report alarm events
foreach (var (info, active) in pendingAlarmEvents)
{
try
{
ReportAlarmEvent(info, active);
}
catch (Exception ex)
{
Log.Warning(ex, "Error reporting alarm event for {Source}", info.SourceName);
}
}
}
}
Interlocked.Add(ref _totalDispatchBatchSize, updates.Count);
Interlocked.Increment(ref _dispatchCycleCount);
ReportDispatchMetricsIfDue();
}
Log.Information("Data change dispatch thread stopped");
}
private void ReportDispatchMetricsIfDue()
{
var now = DateTime.UtcNow;
var elapsed = (now - _lastMetricsReportTime).TotalSeconds;
if (elapsed < 60) return;
var totalEvents = Interlocked.Read(ref _totalMxChangeEvents);
var lastReported = Interlocked.Read(ref _lastReportedMxChangeEvents);
var eventsPerSecond = (totalEvents - lastReported) / elapsed;
Interlocked.Exchange(ref _lastReportedMxChangeEvents, totalEvents);
var batchSize = Interlocked.Read(ref _totalDispatchBatchSize);
var cycles = Interlocked.Read(ref _dispatchCycleCount);
var avgQueueSize = cycles > 0 ? (double)batchSize / cycles : 0;
// Reset rolling counters
Interlocked.Exchange(ref _totalDispatchBatchSize, 0);
Interlocked.Exchange(ref _dispatchCycleCount, 0);
_lastMetricsReportTime = now;
_lastEventsPerSecond = eventsPerSecond;
_lastAvgBatchSize = avgQueueSize;
Log.Information(
"DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}",
eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents);
}
/// <inheritdoc />
protected override void Dispose(bool disposing)
{
if (disposing)
{
StopDispatchThread();
_dataChangeSignal.Dispose();
}
base.Dispose(disposing);
}
#endregion

View File

@@ -2,6 +2,7 @@ using System.Collections.Generic;
using Opc.Ua;
using Opc.Ua.Server;
using ZB.MOM.WW.LmxOpcUa.Host.Domain;
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
using ZB.MOM.WW.LmxOpcUa.Host.Metrics;
namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
@@ -14,6 +15,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly string _galaxyName;
private readonly IMxAccessClient _mxAccessClient;
private readonly PerformanceMetrics _metrics;
private readonly HistorianDataSource? _historianDataSource;
private LmxNodeManager? _nodeManager;
/// <summary>
@@ -39,18 +41,20 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
/// <param name="galaxyName">The Galaxy name used to construct the namespace URI and product URI.</param>
/// <param name="mxAccessClient">The runtime client used by the node manager for live data access.</param>
/// <param name="metrics">The metrics collector shared with the node manager.</param>
public LmxOpcUaServer(string galaxyName, IMxAccessClient mxAccessClient, PerformanceMetrics metrics)
public LmxOpcUaServer(string galaxyName, IMxAccessClient mxAccessClient, PerformanceMetrics metrics,
HistorianDataSource? historianDataSource = null)
{
_galaxyName = galaxyName;
_mxAccessClient = mxAccessClient;
_metrics = metrics;
_historianDataSource = historianDataSource;
}
/// <inheritdoc />
protected override MasterNodeManager CreateMasterNodeManager(IServerInternal server, ApplicationConfiguration configuration)
{
var namespaceUri = $"urn:{_galaxyName}:LmxOpcUa";
_nodeManager = new LmxNodeManager(server, configuration, namespaceUri, _mxAccessClient, _metrics);
_nodeManager = new LmxNodeManager(server, configuration, namespaceUri, _mxAccessClient, _metrics, _historianDataSource);
var nodeManagers = new List<INodeManager> { _nodeManager };
return new MasterNodeManager(server, configuration, null, nodeManagers.ToArray());

View File

@@ -6,6 +6,7 @@ using Opc.Ua.Server;
using Serilog;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
using ZB.MOM.WW.LmxOpcUa.Host.Domain;
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
using ZB.MOM.WW.LmxOpcUa.Host.Metrics;
namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
@@ -20,6 +21,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly OpcUaConfiguration _config;
private readonly IMxAccessClient _mxAccessClient;
private readonly PerformanceMetrics _metrics;
private readonly HistorianDataSource? _historianDataSource;
private ApplicationInstance? _application;
private LmxOpcUaServer? _server;
@@ -44,11 +46,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
/// <param name="config">The endpoint and session settings for the OPC UA host.</param>
/// <param name="mxAccessClient">The runtime client used by the node manager for live reads, writes, and subscriptions.</param>
/// <param name="metrics">The metrics collector shared with the node manager and runtime bridge.</param>
public OpcUaServerHost(OpcUaConfiguration config, IMxAccessClient mxAccessClient, PerformanceMetrics metrics)
public OpcUaServerHost(OpcUaConfiguration config, IMxAccessClient mxAccessClient, PerformanceMetrics metrics,
HistorianDataSource? historianDataSource = null)
{
_config = config;
_mxAccessClient = mxAccessClient;
_metrics = metrics;
_historianDataSource = historianDataSource;
}
/// <summary>
@@ -155,7 +159,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
certOk = await _application.CheckApplicationInstanceCertificate(false, 2048);
}
_server = new LmxOpcUaServer(_config.GalaxyName, _mxAccessClient, _metrics);
_server = new LmxOpcUaServer(_config.GalaxyName, _mxAccessClient, _metrics, _historianDataSource);
await _application.Start(_server);
Log.Information("OPC UA server started on opc.tcp://localhost:{Port}{EndpointPath} (namespace={Namespace})",

View File

@@ -54,6 +54,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
configuration.GetSection("MxAccess").Bind(_config.MxAccess);
configuration.GetSection("GalaxyRepository").Bind(_config.GalaxyRepository);
configuration.GetSection("Dashboard").Bind(_config.Dashboard);
configuration.GetSection("Historian").Bind(_config.Historian);
_mxProxy = new MxProxyAdapter();
_galaxyRepository = new GalaxyRepositoryService(_config.GalaxyRepository);
@@ -152,7 +153,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
// Step 8: Create OPC UA server host + node manager
var effectiveMxClient = (IMxAccessClient?)_mxAccessClient ?? _mxAccessClientForWiring ?? new NullMxAccessClient();
_serverHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, _metrics);
var historianDataSource = new Historian.HistorianDataSource(_config.Historian);
_serverHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, _metrics, historianDataSource);
// Step 9-10: Query hierarchy, start server, build address space
DateTime? initialDeployTime = null;
@@ -202,7 +204,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host
// Step 13: Dashboard
_healthCheck = new HealthCheckService();
_statusReport = new StatusReportService(_healthCheck, _config.Dashboard.RefreshIntervalSeconds);
_statusReport.SetComponents(effectiveMxClient, _metrics, _galaxyStats, _serverHost);
_statusReport.SetComponents(effectiveMxClient, _metrics, _galaxyStats, _serverHost, _nodeManager);
if (_config.Dashboard.Enabled)
{

View File

@@ -29,6 +29,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
/// </summary>
public GalaxyInfo Galaxy { get; set; } = new();
/// <summary>
/// Gets or sets MXAccess data change dispatch queue metrics.
/// </summary>
public DataChangeInfo DataChange { get; set; } = new();
/// <summary>
/// Gets or sets per-operation performance statistics used to diagnose bridge throughput and latency.
/// </summary>
@@ -129,6 +134,32 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
public DateTime? LastRebuildTime { get; set; }
}
/// <summary>
/// Dashboard model for MXAccess data change dispatch metrics.
/// </summary>
public class DataChangeInfo
{
/// <summary>
/// Gets or sets the rate of MXAccess data change events received per second.
/// </summary>
public double EventsPerSecond { get; set; }
/// <summary>
/// Gets or sets the average number of items processed per dispatch cycle.
/// </summary>
public double AvgBatchSize { get; set; }
/// <summary>
/// Gets or sets the number of items currently waiting in the dispatch queue.
/// </summary>
public int PendingItems { get; set; }
/// <summary>
/// Gets or sets the total MXAccess data change events received since startup.
/// </summary>
public long TotalEvents { get; set; }
}
/// <summary>
/// Dashboard model for the status page footer.
/// </summary>

View File

@@ -20,6 +20,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
private PerformanceMetrics? _metrics;
private GalaxyRepositoryStats? _galaxyStats;
private OpcUaServerHost? _serverHost;
private LmxNodeManager? _nodeManager;
/// <summary>
/// Initializes a new status report service for the dashboard using the supplied health-check policy and refresh interval.
@@ -40,12 +41,14 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
/// <param name="galaxyStats">The Galaxy repository statistics to surface on the dashboard.</param>
/// <param name="serverHost">The OPC UA server host whose active session count should be reported.</param>
public void SetComponents(IMxAccessClient? mxAccessClient, PerformanceMetrics? metrics,
GalaxyRepositoryStats? galaxyStats, OpcUaServerHost? serverHost)
GalaxyRepositoryStats? galaxyStats, OpcUaServerHost? serverHost,
LmxNodeManager? nodeManager = null)
{
_mxAccessClient = mxAccessClient;
_metrics = metrics;
_galaxyStats = galaxyStats;
_serverHost = serverHost;
_nodeManager = nodeManager;
}
/// <summary>
@@ -78,6 +81,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
AttributeCount = _galaxyStats?.AttributeCount ?? 0,
LastRebuildTime = _galaxyStats?.LastRebuildTime
},
DataChange = new DataChangeInfo
{
EventsPerSecond = _nodeManager?.MxChangeEventsPerSecond ?? 0,
AvgBatchSize = _nodeManager?.AverageDispatchBatchSize ?? 0,
PendingItems = _nodeManager?.PendingDataChangeCount ?? 0,
TotalEvents = _nodeManager?.TotalMxChangeEvents ?? 0
},
Operations = _metrics?.GetStatistics() ?? new(),
Footer = new FooterInfo
{
@@ -97,6 +107,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
var sb = new StringBuilder();
sb.AppendLine("<!DOCTYPE html><html><head>");
sb.AppendLine("<meta charset='utf-8'>");
sb.AppendLine($"<meta http-equiv='refresh' content='{_refreshIntervalSeconds}'>");
sb.AppendLine("<title>LmxOpcUa Status</title>");
sb.AppendLine("<style>");
@@ -124,6 +135,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
sb.AppendLine($"<p>Active: {data.Subscriptions.ActiveCount}</p>");
sb.AppendLine("</div>");
// Data Change Dispatch panel
sb.AppendLine("<div class='panel gray'><h2>Data Change Dispatch</h2>");
sb.AppendLine($"<p>Events/sec: <b>{data.DataChange.EventsPerSecond:F1}</b> | Avg Batch Size: <b>{data.DataChange.AvgBatchSize:F1}</b> | Pending: {data.DataChange.PendingItems} | Total Events: {data.DataChange.TotalEvents:N0}</p>");
sb.AppendLine("</div>");
// Galaxy Info panel
sb.AppendLine("<div class='panel gray'><h2>Galaxy Info</h2>");
sb.AppendLine($"<p>Galaxy: <b>{data.Galaxy.GalaxyName}</b> | DB: {(data.Galaxy.DbConnected ? "Connected" : "Disconnected")}</p>");

View File

@@ -29,5 +29,10 @@
"Enabled": true,
"Port": 8081,
"RefreshIntervalSeconds": 10
},
"Historian": {
"ConnectionString": "Server=localhost;Database=Runtime;Integrated Security=true;",
"CommandTimeoutSeconds": 30,
"MaxValuesPerRead": 10000
}
}