diff --git a/docs/reqs/OpcUaServerReqs.md b/docs/reqs/OpcUaServerReqs.md index b56a7ee..1df1d5b 100644 --- a/docs/reqs/OpcUaServerReqs.md +++ b/docs/reqs/OpcUaServerReqs.md @@ -51,12 +51,14 @@ Each user-defined attribute on a deployed object shall be represented as an OPC - Each row from `attributes.sql` creates one variable node under the matching object node (matched by `gobject_id`). - Variable node BrowseName and DisplayName are set to `attribute_name`. - Variable node stores `full_tag_reference` as its runtime MXAccess address. -- Variable nodes have AccessLevel = CurrentRead | CurrentWrite (3) by default. +- Variable node AccessLevel is set based on the attribute's `security_classification` per the mapping in `gr/data_type_mapping.md`. +- FreeAccess (0), Operate (1), Tune (4), Configure (5) → AccessLevel = CurrentRead | CurrentWrite (3). +- SecuredWrite (2), VerifiedWrite (3), ViewOnly (6) → AccessLevel = CurrentRead (1). - Objects with no user-defined attributes still appear as object nodes with zero children. ### Details -- Security classification from the attributes query is noted but not enforced at the OPC UA level (Galaxy runtime handles security). +- Security classification determines the OPC UA AccessLevel and UserAccessLevel attributes on each variable node. The OPC UA stack enforces read-only access for nodes with CurrentRead-only access level. - Attributes whose names start with `_` are already filtered by the SQL query. --- diff --git a/gr/data_type_mapping.md b/gr/data_type_mapping.md index 43d7dc2..23c1920 100644 --- a/gr/data_type_mapping.md +++ b/gr/data_type_mapping.md @@ -67,6 +67,22 @@ Example for `TestMachine_001.MachineID` (`is_array=0`): - ValueRank: -1 - ArrayDimensions: (not set) +## Security Classification + +Galaxy attributes have a `security_classification` column that controls the access level required for writes. The attributes query returns this value for each attribute. + +| security_classification | Galaxy Level | OPC UA Access | Description | +|-------------------------|--------------|---------------|-------------| +| 0 | FreeAccess | ReadWrite | No security restrictions | +| 1 | Operate | ReadWrite | Normal operating level (default) | +| 2 | SecuredWrite | ReadOnly | Requires elevated write access | +| 3 | VerifiedWrite | ReadOnly | Requires verified/confirmed write access | +| 4 | Tune | ReadWrite | Tuning-level access | +| 5 | Configure | ReadWrite | Configuration-level access | +| 6 | ViewOnly | ReadOnly | Read-only, no writes permitted | + +Most attributes default to `Operate` (1). Higher values indicate more restrictive write access. `ViewOnly` (6) attributes should be exposed as read-only in OPC UA (`AccessLevel = CurrentRead` only, no `CurrentWrite`). + ## DateTime Conversion Galaxy `Time` (mx_data_type=6) stores DateTime values. OPC UA DateTime is defined as the number of 100-nanosecond intervals since January 1, 1601 (UTC). Ensure the conversion accounts for: diff --git a/gr/queries/attributes.sql b/gr/queries/attributes.sql index 0760700..1b0a44f 100644 --- a/gr/queries/attributes.sql +++ b/gr/queries/attributes.sql @@ -1,13 +1,23 @@ -- Galaxy Object User-Defined Attributes/Tags for OPC UA Server --- Returns user-defined (dynamic) attributes for automation objects. +-- Returns user-defined (dynamic) attributes for deployed automation objects. -- These are the attributes defined on templates and inherited by instances --- via the derived_from_gobject_id chain (e.g., MachineID, MoveInFlag). +-- via the deployed package derivation chain (e.g., MachineID, MoveInFlag). -- -- Use full_tag_reference for read/write operations against the runtime. -- Join with hierarchy.sql results on gobject_id to place attributes in the OPC UA browse tree. -- -- For system/primitive attributes as well, see attributes_extended.sql. -- +-- Only attributes that existed at deploy time are included. The CTE walks +-- package.derived_from_package_id starting from each instance's deployed_package_id, +-- then joins dynamic_attribute on package_id to filter out post-deploy additions. +-- When the same attribute appears at multiple levels, only the shallowest +-- (most-derived) version is kept. +-- +-- Historization detection: an attribute is historized when a primitive_instance +-- with a matching name exists in the deployed package chain and its primitive_definition +-- has primitive_name = 'HistoryExtension'. +-- -- Array dimensions are extracted from the mx_value hex string on the template's -- dynamic_attribute row (bytes 5-6, little-endian uint16 at hex positions 13-16). -- @@ -16,48 +26,98 @@ -- 5 = String, 6 = Time (DateTime), 7 = ElapsedTime (TimeSpan), -- 8 = (reference), 13 = (enumeration), 14 = (custom), 15 = InternationalizedString, 16 = (custom) -;WITH template_chain AS ( - -- Start from each non-template instance - SELECT g.gobject_id, g.derived_from_gobject_id, 0 AS depth +;WITH deployed_package_chain AS ( + -- Start from each deployed instance's deployed package + SELECT + g.gobject_id, + p.package_id, + p.derived_from_package_id, + 0 AS depth FROM gobject g + INNER JOIN package p + ON p.package_id = g.deployed_package_id WHERE g.is_template = 0 + AND g.deployed_package_id <> 0 UNION ALL - -- Walk up the template derivation chain - SELECT tc.gobject_id, t.derived_from_gobject_id, tc.depth + 1 - FROM template_chain tc - INNER JOIN gobject t ON t.gobject_id = tc.derived_from_gobject_id - WHERE tc.derived_from_gobject_id <> 0 AND tc.depth < 10 + -- Walk up the package derivation chain + SELECT + dpc.gobject_id, + p.package_id, + p.derived_from_package_id, + dpc.depth + 1 + FROM deployed_package_chain dpc + INNER JOIN package p + ON p.package_id = dpc.derived_from_package_id + WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10 ) -SELECT DISTINCT - g.gobject_id, - g.tag_name, - da.attribute_name, - g.tag_name + '.' + da.attribute_name - + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END - AS full_tag_reference, - da.mx_data_type, - dt.description AS data_type_name, - da.is_array, - CASE WHEN da.is_array = 1 - THEN CONVERT(int, CONVERT(varbinary(2), - SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) - ELSE NULL - END AS array_dimension, - da.mx_attribute_category, - da.security_classification -FROM template_chain tc -INNER JOIN dynamic_attribute da - ON da.gobject_id = tc.derived_from_gobject_id -INNER JOIN gobject g - ON g.gobject_id = tc.gobject_id -INNER JOIN template_definition td - ON td.template_definition_id = g.template_definition_id -LEFT JOIN data_type dt - ON dt.mx_data_type = da.mx_data_type -WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) - AND g.is_template = 0 - AND g.deployed_package_id <> 0 - AND da.attribute_name NOT LIKE '[_]%' - AND da.attribute_name NOT LIKE '%.Description' - AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) -ORDER BY g.tag_name, da.attribute_name; +SELECT + gobject_id, + tag_name, + attribute_name, + full_tag_reference, + mx_data_type, + data_type_name, + is_array, + array_dimension, + mx_attribute_category, + security_classification, + is_historized, + is_alarm +FROM ( + SELECT + dpc.gobject_id, + g.tag_name, + da.attribute_name, + g.tag_name + '.' + da.attribute_name + + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END + AS full_tag_reference, + da.mx_data_type, + dt.description AS data_type_name, + da.is_array, + CASE WHEN da.is_array = 1 + THEN CONVERT(int, CONVERT(varbinary(2), + SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) + ELSE NULL + END AS array_dimension, + da.mx_attribute_category, + da.security_classification, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi + ON pi.package_id = dpc2.package_id + AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd + ON pd.primitive_definition_id = pi.primitive_definition_id + AND pd.primitive_name = 'HistoryExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_historized, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi + ON pi.package_id = dpc2.package_id + AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd + ON pd.primitive_definition_id = pi.primitive_definition_id + AND pd.primitive_name = 'AlarmExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_alarm, + ROW_NUMBER() OVER ( + PARTITION BY dpc.gobject_id, da.attribute_name + ORDER BY dpc.depth + ) AS rn + FROM deployed_package_chain dpc + INNER JOIN dynamic_attribute da + ON da.package_id = dpc.package_id + INNER JOIN gobject g + ON g.gobject_id = dpc.gobject_id + INNER JOIN template_definition td + ON td.template_definition_id = g.template_definition_id + LEFT JOIN data_type dt + ON dt.mx_data_type = da.mx_data_type + WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) + AND da.attribute_name NOT LIKE '[_]%' + AND da.attribute_name NOT LIKE '%.Description' + AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) +) ranked +WHERE rn = 1 +ORDER BY tag_name, attribute_name; diff --git a/gr/queries/attributes_extended.sql b/gr/queries/attributes_extended.sql index aaf8fd8..12a3b43 100644 --- a/gr/queries/attributes_extended.sql +++ b/gr/queries/attributes_extended.sql @@ -1,14 +1,20 @@ -- Galaxy Object Attributes/Tags for OPC UA Server --- Returns all runtime-readable attributes for automation objects. +-- Returns all runtime-readable attributes for deployed automation objects. -- Use full_tag_reference for read/write operations against the runtime. -- Join with hierarchy.sql results on gobject_id to place attributes in the OPC UA browse tree. -- -- Two sources of attributes: -- 1. attribute_definition (via primitive_instance) — system/primitive attributes --- Derived from internal_runtime_attributes view logic. +-- Joined via the instance's deployed_package_id to exclude undeployed changes. -- 2. dynamic_attribute — user-defined attributes (e.g., MachineID, MoveInFlag) --- Defined on templates, inherited by instances via derived_from_gobject_id chain. --- Requires recursive CTE to walk the template derivation hierarchy. +-- Walked via the deployed package derivation chain (package.derived_from_package_id) +-- to only include attributes that existed at deploy time. +-- When the same attribute appears at multiple levels (e.g., instance override and +-- base template), only the shallowest (most-derived) version is kept. +-- +-- Historization detection: a dynamic attribute is historized when a primitive_instance +-- with a matching name exists in the deployed package chain and its primitive_definition +-- has primitive_name = 'HistoryExtension'. -- -- Attribute category filter (mx_attribute_category): -- 2-11, 24 = runtime readable attributes @@ -24,19 +30,88 @@ -- 5 = String, 6 = Time (DateTime), 7 = ElapsedTime (TimeSpan), -- 8 = (reference), 13 = (enumeration), 14 = (custom), 15 = InternationalizedString, 16 = (custom) -;WITH template_chain AS ( - -- Start from each non-template instance - SELECT g.gobject_id, g.derived_from_gobject_id, 0 AS depth +;WITH deployed_package_chain AS ( + -- Start from each deployed instance's deployed package + SELECT + g.gobject_id, + p.package_id, + p.derived_from_package_id, + 0 AS depth FROM gobject g + INNER JOIN package p + ON p.package_id = g.deployed_package_id WHERE g.is_template = 0 + AND g.deployed_package_id <> 0 UNION ALL - -- Walk up the template derivation chain - SELECT tc.gobject_id, t.derived_from_gobject_id, tc.depth + 1 - FROM template_chain tc - INNER JOIN gobject t ON t.gobject_id = tc.derived_from_gobject_id - WHERE tc.derived_from_gobject_id <> 0 AND tc.depth < 10 + -- Walk up the package derivation chain + SELECT + dpc.gobject_id, + p.package_id, + p.derived_from_package_id, + dpc.depth + 1 + FROM deployed_package_chain dpc + INNER JOIN package p + ON p.package_id = dpc.derived_from_package_id + WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10 +), +-- Rank dynamic attributes: shallowest (most-derived) wins per object + attribute +ranked_dynamic AS ( + SELECT + dpc.gobject_id, + g.tag_name, + da.attribute_name, + g.tag_name + '.' + da.attribute_name + + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END + AS full_tag_reference, + da.mx_data_type, + dt.description AS data_type_name, + da.is_array, + CASE WHEN da.is_array = 1 + THEN CONVERT(int, CONVERT(varbinary(2), + SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) + ELSE NULL + END AS array_dimension, + da.mx_attribute_category, + da.security_classification, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi + ON pi.package_id = dpc2.package_id + AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd + ON pd.primitive_definition_id = pi.primitive_definition_id + AND pd.primitive_name = 'HistoryExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_historized, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi + ON pi.package_id = dpc2.package_id + AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd + ON pd.primitive_definition_id = pi.primitive_definition_id + AND pd.primitive_name = 'AlarmExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_alarm, + ROW_NUMBER() OVER ( + PARTITION BY dpc.gobject_id, da.attribute_name + ORDER BY dpc.depth + ) AS rn + FROM deployed_package_chain dpc + INNER JOIN dynamic_attribute da + ON da.package_id = dpc.package_id + INNER JOIN gobject g + ON g.gobject_id = dpc.gobject_id + INNER JOIN template_definition td + ON td.template_definition_id = g.template_definition_id + LEFT JOIN data_type dt + ON dt.mx_data_type = da.mx_data_type + WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) + AND da.attribute_name NOT LIKE '[_]%' + AND da.attribute_name NOT LIKE '%.Description' + AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) ) -SELECT DISTINCT +SELECT gobject_id, tag_name, primitive_name, @@ -48,9 +123,11 @@ SELECT DISTINCT array_dimension, mx_attribute_category, security_classification, + is_historized, + is_alarm, attribute_source FROM ( - -- Part 1: System/primitive attributes (from attribute_definition) + -- Part 1: System/primitive attributes (from attribute_definition via deployed package) SELECT g.gobject_id, g.tag_name, @@ -71,6 +148,8 @@ FROM ( END AS array_dimension, ad.mx_attribute_category, ad.security_classification, + CAST(0 AS int) AS is_historized, + CAST(0 AS int) AS is_alarm, 'primitive' AS attribute_source FROM gobject g INNER JOIN instance i @@ -79,7 +158,7 @@ FROM ( ON td.template_definition_id = g.template_definition_id AND td.runtime_clsid <> '{00000000-0000-0000-0000-000000000000}' INNER JOIN package p - ON p.package_id = g.checked_in_package_id + ON p.package_id = g.deployed_package_id INNER JOIN primitive_instance pi ON pi.package_id = p.package_id AND pi.property_bitmask & 0x10 <> 0x10 @@ -95,40 +174,23 @@ FROM ( UNION ALL - -- Part 2: User-defined attributes (from dynamic_attribute via template chain) + -- Part 2: User-defined attributes (shallowest override from deployed package chain) SELECT - g.gobject_id, - g.tag_name, + gobject_id, + tag_name, '' AS primitive_name, - da.attribute_name, - g.tag_name + '.' + da.attribute_name - + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END - AS full_tag_reference, - da.mx_data_type, - dt.description AS data_type_name, - da.is_array, - CASE WHEN da.is_array = 1 - THEN CONVERT(int, CONVERT(varbinary(2), - SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) - ELSE NULL - END AS array_dimension, - da.mx_attribute_category, - da.security_classification, + attribute_name, + full_tag_reference, + mx_data_type, + data_type_name, + is_array, + array_dimension, + mx_attribute_category, + security_classification, + is_historized, + is_alarm, 'dynamic' AS attribute_source - FROM template_chain tc - INNER JOIN dynamic_attribute da - ON da.gobject_id = tc.derived_from_gobject_id - INNER JOIN gobject g - ON g.gobject_id = tc.gobject_id - INNER JOIN template_definition td - ON td.template_definition_id = g.template_definition_id - LEFT JOIN data_type dt - ON dt.mx_data_type = da.mx_data_type - WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) - AND g.is_template = 0 - AND g.deployed_package_id <> 0 - AND da.attribute_name NOT LIKE '[_]%' - AND da.attribute_name NOT LIKE '%.Description' - AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) + FROM ranked_dynamic + WHERE rn = 1 ) all_attributes ORDER BY tag_name, primitive_name, attribute_name; diff --git a/hda_plan.md b/hda_plan.md new file mode 100644 index 0000000..7f5c438 --- /dev/null +++ b/hda_plan.md @@ -0,0 +1,305 @@ +# Alarm & History Detection Plan + +## Context + +Galaxy attributes can be alarms (with `AlarmExtension` primitives) or historized (with `HistoryExtension` primitives). This plan documents how to detect these in the Galaxy Repository database and maps Galaxy alarm properties to OPC UA Alarms & Conditions concepts. + +## 1. Detection in the Galaxy Repository + +### Alarm Detection + +An attribute is an alarm when a `primitive_instance` exists in the deployed package chain where: +- `primitive_instance.primitive_name` matches the `dynamic_attribute.attribute_name` +- `primitive_definition.primitive_name = 'AlarmExtension'` + +Example: `TestMachine_001.TestAlarm001` has a `primitive_instance` named `TestAlarm001` with `primitive_definition.primitive_name = 'AlarmExtension'`. + +### History Detection + +Already implemented in the attributes queries. Same pattern but checking for `primitive_definition.primitive_name = 'HistoryExtension'`. + +### Query Pattern + +Both use the same EXISTS subquery against the deployed package chain: + +```sql +CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi + ON pi.package_id = dpc2.package_id + AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd + ON pd.primitive_definition_id = pi.primitive_definition_id + AND pd.primitive_name = 'AlarmExtension' -- or 'HistoryExtension' + WHERE dpc2.gobject_id = dpc.gobject_id +) THEN 1 ELSE 0 END AS is_alarm +``` + +## 2. Galaxy Alarm Properties (AlarmExtension) + +The `AlarmExtension` primitive exposes 24 public attributes per alarm. These are already returned by the extended attributes query as primitive child attributes (e.g., `TestMachine_001.TestAlarm001.Acked`). + +### Key Properties and Runtime Values + +| Galaxy Attribute | Data Type | Runtime Example | Description | +|---|---|---|---| +| **InAlarm** | Boolean | `False` | Whether alarm condition is currently active | +| **Acked** | Boolean | `False` | Whether alarm has been acknowledged | +| **Condition** | Boolean | `False` | Raw condition value (input to alarm logic) | +| **ActiveAlarmState** | Boolean | `True` | Active state of alarm processing | +| **Priority** | Integer | `500` | Alarm priority (1-999, higher = more urgent) | +| **Category** | Enum | `1` (Discrete) | Alarm category type | +| **AlarmMode** | Enum | `1` (Enable) | Operational mode: 1=Enable, 2=Disable, 3=Silence | +| **AckMsg** | String | `""` | Acknowledgment message/comment | +| **TimeAlarmOn** | DateTime | | When alarm condition triggered | +| **TimeAlarmOff** | DateTime | | When alarm condition cleared | +| **TimeAlarmAcked** | DateTime | | When alarm was acknowledged | +| **AlarmInhibit** | Boolean | | Inhibit alarm processing | +| **AlarmShelved** | Boolean | `False` | Whether alarm is shelved | +| **AlarmShelveNode** | String | | Node that shelved the alarm | +| **AlarmShelveReason** | String | | Reason for shelving | +| **AlarmShelveUser** | String | | User who shelved | +| **AlarmShelveStartTime** | DateTime | | When shelve started | +| **AlarmShelveStopTime** | DateTime | | When shelve ends | +| **AlarmShelveCmd** | String | | Shelving command | +| **AlarmModeCmd** | Enum | | Command to change alarm mode | +| **AlarmSourceAttr** | Reference | | Source attribute reference | +| **DescAttrName** | String | | Descriptive attribute name | +| **Alarm.TimeDeadband** | ElapsedTime | | Time deadband for alarm | + +### Alarm Enum Values + +**AlarmMode**: Enable (1), Disable (2), Silence (3) + +**Category**: Discrete (1), Value LoLo, Value Lo, Value Hi, Value HiHi, ROC, Deviation (and more) + +## 3. Mapping Galaxy Alarm Properties to OPC UA + +### OPC UA Alarm Type Hierarchy + +``` +ConditionType + └─ AcknowledgeableConditionType + └─ AlarmConditionType + ├─ DiscreteAlarmType ← for Galaxy boolean alarms (Category=Discrete) + ├─ OffNormalAlarmType ← alternative for boolean alarms + └─ LimitAlarmType ← for analog alarms (Hi/Lo/HiHi/LoLo) + ├─ ExclusiveLimitAlarmType + └─ NonExclusiveLimitAlarmType +``` + +Galaxy boolean alarms (like TestAlarm001/002/003) map to **DiscreteAlarmType** or **OffNormalAlarmType**. + +### Property Mapping + +| Galaxy Property | OPC UA Alarm Property | Notes | +|---|---|---| +| `InAlarm` | `ActiveState.Id` | Boolean: alarm is active | +| `Acked` | `AckedState.Id` | Boolean: alarm acknowledged | +| `Priority` | `Severity` | Galaxy 1-999 maps to OPC UA 1-1000 | +| `AckMsg` | `Comment` | Acknowledgment message | +| `Condition` | Source variable value | The boolean condition input | +| `AlarmMode` (Enable/Disable) | `EnabledState.Id` | Enable=true, Disable/Silence=false | +| `ActiveAlarmState` | `Retain` | Whether condition should be retained | +| `TimeAlarmOn` | `ActiveState.TransitionTime` | When alarm became active | +| `TimeAlarmOff` | `ActiveState.TransitionTime` | When alarm became inactive | +| `TimeAlarmAcked` | `AckedState.TransitionTime` | When alarm was acknowledged | +| `AlarmShelved` | `ShelvedState` (current state) | Maps to Unshelved/OneShotShelved/TimedShelved | +| `AlarmShelveStartTime` | `ShelvingState.UnshelveTime` | Computed from start/stop times | +| `Category` | `ConditionClassId` | Identifies condition class | +| `AlarmInhibit` | `SuppressedState.Id` | Alarm suppression | +| `DescAttrName` | `Message` | Description/message for alarm | + +### Properties Not Available in Galaxy + +These OPC UA properties have no direct Galaxy equivalent and would use defaults: +- `ConfirmedState` — Galaxy doesn't have a confirmed concept (default: true) +- `BranchId` — Galaxy doesn't support branching (default: null) +- `Quality` — Use the source variable's StatusCode + +## 4. Implementation Approach + +### OPC UA SDK Classes + +- `AlarmConditionState` — main class for alarm nodes +- `TwoStateVariableType` — for ActiveState, AckedState, EnabledState, ShelvedState +- `ShelvedStateMachineType` — for shelving state management + +### Key Implementation Steps + +1. **Detect alarms in the query** — add `is_alarm` column to attributes queries (same pattern as `is_historized`) +2. **Create alarm condition nodes** — for attributes where `is_alarm = 1`, create an `AlarmConditionState` instead of a plain `BaseDataVariableState` +3. **Map properties** — subscribe to the Galaxy alarm sub-attributes (InAlarm, Acked, Priority, etc.) and update the OPC UA alarm state +4. **Event notifications** — when alarm state changes arrive via MXAccess `OnDataChange`, raise OPC UA alarm events via `ReportEvent()` +5. **Condition refresh** — implement `ConditionRefresh()` to send current alarm states to newly subscribing clients +6. **Acknowledge method** — implement the OPC UA `Acknowledge` method to write back to Galaxy via MXAccess + +### Galaxy Alarm Types in the Database + +51 alarm-related primitive definitions exist. The main ones relevant to OPC UA mapping: + +| Galaxy Primitive | OPC UA Alarm Type | +|---|---| +| `AlarmExtension` (Boolean) | `DiscreteAlarmType` / `OffNormalAlarmType` | +| `AnalogExtension.LevelAlarms.Hi/HiHi/Lo/LoLo` | `ExclusiveLimitAlarmType` or `NonExclusiveLimitAlarmType` | +| `AnalogExtension.ROCAlarms` | `RateOfChangeAlarmType` | +| `AnalogExtension.DeviationAlarms` | `DeviationAlarmType` | + +### Files to Modify + +- `gr/queries/attributes_extended.sql` — add `is_alarm` column +- `gr/queries/attributes.sql` — add `is_alarm` column +- `src/.../Domain/GalaxyAttributeInfo.cs` — add `IsAlarm` property +- `src/.../GalaxyRepository/GalaxyRepositoryService.cs` — read `is_alarm` from query results +- `src/.../OpcUa/LmxNodeManager.cs` — create `AlarmConditionState` nodes for alarm attributes +- New: alarm state update handler mapping MXAccess data changes to OPC UA alarm events +- `tools/opcuacli-dotnet/Commands/AlarmsCommand.cs` — NEW CLI command +- `tools/opcuacli-dotnet/README.md` — add `alarms` command documentation + +## 5. OPC UA CLI Tool — Alarms Command + +Add an `alarms` command to `tools/opcuacli-dotnet/` for subscribing to and displaying OPC UA alarm events. + +### Usage + +```bash +# Subscribe to all alarm events under a node (e.g., TestMachine_001) +dotnet run -- alarms -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=TestMachine_001" + +# Subscribe to all events under the root ZB node +dotnet run -- alarms -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=ZB" + +# Subscribe to all server events (Server node) +dotnet run -- alarms -u opc.tcp://localhost:4840/LmxOpcUa + +# Request a condition refresh to get current alarm states immediately +dotnet run -- alarms -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=TestMachine_001" --refresh +``` + +### Command Options + +| Flag | Description | +|------|-------------| +| `-u, --url` | OPC UA server endpoint URL (required) | +| `-n, --node` | Node ID to monitor for events (default: Server node i=2253) | +| `--refresh` | Request a ConditionRefresh after subscribing to get current states | +| `-i, --interval` | Publishing interval in milliseconds (default: 1000) | + +### Output Format + +``` +Subscribed to alarm events on ns=1;s=TestMachine_001 (interval: 1000ms). Press Ctrl+C to stop. + +[2026-03-26T04:30:12.000Z] ALARM TestMachine_001.TestAlarm001 + State: Active, Unacknowledged + Severity: 500 + Message: Discrete alarm triggered + Source: ns=1;s=TestMachine_001.TestAlarm001 + Retain: True + +[2026-03-26T04:30:45.000Z] ALARM TestMachine_001.TestAlarm001 + State: Active, Acknowledged + Severity: 500 + Message: Discrete alarm triggered + AckUser: operator1 + +[2026-03-26T04:31:02.000Z] ALARM TestMachine_001.TestAlarm001 + State: Inactive, Acknowledged + Severity: 500 + Retain: False +``` + +### Implementation + +New file: `tools/opcuacli-dotnet/Commands/AlarmsCommand.cs` + +OPC UA alarm events are received through event-type monitored items, not regular data-change subscriptions. The key differences from the `subscribe` command: + +```csharp +// Create an event monitored item (not a data-change item) +var item = new MonitoredItem(subscription.DefaultItem) +{ + StartNodeId = nodeId, + DisplayName = "AlarmMonitor", + SamplingInterval = interval, + NodeClass = NodeClass.Object, + // Subscribe to events, not data changes + AttributeId = Attributes.EventNotifier, + // Select which event fields to return + Filter = CreateEventFilter() +}; +``` + +#### Event Filter + +Select the standard alarm fields to display: + +```csharp +private static EventFilter CreateEventFilter() +{ + var filter = new EventFilter(); + filter.AddSelectClause(ObjectTypeIds.BaseEventType, "EventId"); + filter.AddSelectClause(ObjectTypeIds.BaseEventType, "EventType"); + filter.AddSelectClause(ObjectTypeIds.BaseEventType, "SourceName"); + filter.AddSelectClause(ObjectTypeIds.BaseEventType, "Time"); + filter.AddSelectClause(ObjectTypeIds.BaseEventType, "Message"); + filter.AddSelectClause(ObjectTypeIds.BaseEventType, "Severity"); + filter.AddSelectClause(ObjectTypeIds.ConditionType, "ConditionName"); + filter.AddSelectClause(ObjectTypeIds.ConditionType, "Retain"); + filter.AddSelectClause(ObjectTypeIds.AcknowledgeableConditionType, "AckedState/Id"); + filter.AddSelectClause(ObjectTypeIds.AlarmConditionType, "ActiveState/Id"); + filter.AddSelectClause(ObjectTypeIds.AlarmConditionType, "EnabledState/Id"); + filter.AddSelectClause(ObjectTypeIds.AlarmConditionType, "SuppressedOrShelved"); + return filter; +} +``` + +#### Event Notification Handler + +```csharp +item.Notification += (monitoredItem, e) => +{ + if (e.NotificationValue is EventFieldList eventFields) + { + var time = eventFields.EventFields[3].Value as DateTime?; + var sourceName = eventFields.EventFields[2].Value as string; + var message = (eventFields.EventFields[4].Value as LocalizedText)?.Text; + var severity = eventFields.EventFields[5].Value as ushort?; + var ackedState = eventFields.EventFields[8].Value as bool?; + var activeState = eventFields.EventFields[9].Value as bool?; + var retain = eventFields.EventFields[7].Value as bool?; + + var stateDesc = FormatAlarmState(activeState, ackedState); + Console.WriteLine($"[{time:O}] ALARM {sourceName}"); + Console.WriteLine($" State: {stateDesc}"); + Console.WriteLine($" Severity: {severity}"); + if (!string.IsNullOrEmpty(message)) + Console.WriteLine($" Message: {message}"); + Console.WriteLine($" Retain: {retain}"); + Console.WriteLine(); + } +}; +``` + +#### Condition Refresh + +When `--refresh` is specified, call `ConditionRefresh` after creating the subscription to receive the current state of all active alarms: + +```csharp +if (refresh) +{ + await subscription.ConditionRefreshAsync(); + await console.Output.WriteLineAsync("Condition refresh requested."); +} +``` + +#### State Formatting + +```csharp +static string FormatAlarmState(bool? active, bool? acked) +{ + var activePart = active == true ? "Active" : "Inactive"; + var ackedPart = acked == true ? "Acknowledged" : "Unacknowledged"; + return $"{activePart}, {ackedPart}"; +} +``` diff --git a/historian_plan.md b/historian_plan.md new file mode 100644 index 0000000..65f4219 --- /dev/null +++ b/historian_plan.md @@ -0,0 +1,314 @@ +# OPC UA Historical Data Access Plan + +## Context + +Galaxy attributes with `HistoryExtension` primitives are historized by the Wonderware Historian. The Historian exposes its data via SQL queries against the `Runtime` database. This plan documents how to implement OPC UA Historical Data Access (HDA) so OPC UA clients can read historical values through the server. + +## 1. Wonderware Historian Data Source + +### Connection + +- **Database**: `Runtime` on `localhost` (Windows Auth) +- **Constraint**: History views require a `WHERE TagName='...'` clause — queries without a tag filter will fail. + +### History View Schema (31 columns) + +Key columns for OPC UA HDA: + +| Column | Type | Description | +|---|---|---| +| `DateTime` | datetime2 | Timestamp of the value | +| `TagName` | nvarchar(256) | Galaxy tag reference (e.g., `TestMachine_001.TestHistoryValue`) | +| `Value` | float | Numeric value | +| `vValue` | nvarchar(4000) | String representation of value | +| `Quality` | tinyint | Quality code (0=Good, 1=Bad, 133=Uncertain) | +| `QualityDetail` | int | Detailed quality (192=Good) | +| `OPCQuality` | int | OPC-style quality code | + +### Raw Data Query + +```sql +SELECT DateTime, Value, vValue, Quality, QualityDetail +FROM Runtime.dbo.History +WHERE TagName = 'TestMachine_001.TestHistoryValue' + AND DateTime BETWEEN @StartTime AND @EndTime +ORDER BY DateTime +``` + +### Aggregate Data (AnalogSummaryHistory) + +The Historian provides pre-calculated aggregates via the `AnalogSummaryHistory` view: + +| Column | Description | +|---|---| +| `StartDateTime` | Start of aggregate interval | +| `EndDateTime` | End of aggregate interval | +| `First` | First value in interval | +| `Last` | Last value in interval | +| `Minimum` | Minimum value | +| `Maximum` | Maximum value | +| `Average` | Average value | +| `StdDev` | Standard deviation | +| `Integral` | Time-weighted integral | +| `ValueCount` | Number of values | + +```sql +SELECT StartDateTime, EndDateTime, Average, Minimum, Maximum, ValueCount +FROM Runtime.dbo.AnalogSummaryHistory +WHERE TagName = 'TestMachine_001.TestHistoryValue' + AND StartDateTime BETWEEN @StartTime AND @EndTime + AND wwResolution = @IntervalMs +``` + +### Retrieval Modes + +| Mode | Description | +|---|---| +| `DELTA` | Change-based retrieval (default) — returns values when they changed | +| `CYCLIC` | Periodic sampling — returns interpolated values at fixed intervals | + +### Quality Mapping + +| Historian Quality | OPC UA StatusCode | +|---|---| +| 0 (Good) | `Good` (0x00000000) | +| 1 (Bad) | `Bad` (0x80000000) | +| 133 (Uncertain) | `Uncertain` (0x40000000) | + +### Test Data + +Tag: `TestMachine_001.TestHistoryValue` (Analog, Integer) +- 4 records from 2026-03-26 00:44 to 01:09 +- Values: 0, 3, 4, 7, 9 +- InterpolationType: STAIRSTEP + +## 2. OPC UA HDA Implementation + +### Marking Variables as Historized + +For attributes where `is_historized = 1` from the Galaxy query: + +```csharp +variable.Historizing = true; +variable.AccessLevel |= AccessLevels.HistoryRead; +variable.UserAccessLevel |= AccessLevels.HistoryRead; +``` + +This tells OPC UA clients the variable supports `HistoryRead` requests. + +### Server-Side Handler + +Override `HistoryRead` on `LmxNodeManager` (inherits from `CustomNodeManager2`): + +```csharp +public override void HistoryRead( + OperationContext context, + HistoryReadDetails details, + TimestampsToReturn timestampsToReturn, + bool releaseContinuationPoints, + IList nodesToRead, + IList results, + IList errors) +``` + +Dispatch based on `details` type: +- `ReadRawModifiedDetails` → `HistoryReadRaw` → query `Runtime.dbo.History` +- `ReadProcessedDetails` → `HistoryReadProcessed` → query `Runtime.dbo.AnalogSummaryHistory` +- `ReadAtTimeDetails` → `HistoryReadAtTime` → query with `wwRetrievalMode = 'Cyclic'` + +### ReadRaw Implementation + +Map `HistoryReadRawModifiedDetails` to a Historian SQL query: + +| OPC UA Parameter | SQL Mapping | +|---|---| +| `StartTime` | `DateTime >= @StartTime` | +| `EndTime` | `DateTime <= @EndTime` | +| `NumValuesPerNode` | `TOP @NumValues` | +| `ReturnBounds` | Include one value before StartTime and one after EndTime | + +Result: populate `HistoryData` with `DataValue` list: +```csharp +new DataValue +{ + Value = row.Value, + SourceTimestamp = row.DateTime, + StatusCode = MapQuality(row.Quality) +} +``` + +### ReadProcessed Implementation + +Map `HistoryReadProcessedDetails` to `AnalogSummaryHistory`: + +| OPC UA Aggregate | Historian Column | +|---|---| +| `Average` | `Average` | +| `Minimum` | `Minimum` | +| `Maximum` | `Maximum` | +| `Count` | `ValueCount` | +| `Start` | `First` | +| `End` | `Last` | +| `StandardDeviationPopulation` | `StdDev` | + +`ProcessingInterval` maps to `wwResolution` (milliseconds). + +### Continuation Points for Paging + +When `NumValuesPerNode` limits the result: + +1. Query `NumValuesPerNode + 1` rows +2. If more exist, save a continuation point (store last timestamp + query params) +3. Return `StatusCodes.GoodMoreData` with the continuation point +4. On next request, restore the continuation point and resume from last timestamp + +Use `Session.SaveHistoryContinuationPoint()` / `RestoreHistoryContinuationPoint()` to manage state. + +### Tag Name Resolution + +The `FullTagReference` stored on each variable node (e.g., `TestMachine_001.TestHistoryValue`) is exactly the `TagName` used in the Historian query — no translation needed. + +## 3. Galaxy Repository Detection + +Already implemented: `is_historized` column in the attributes queries detects `HistoryExtension` primitives in the deployed package chain. + +## 4. Implementation Steps + +### Phase 1: Mark historized nodes +- Read `is_historized` from query results into `GalaxyAttributeInfo` +- In `LmxNodeManager.CreateAttributeVariable`, set `Historizing = true` and add `HistoryRead` to `AccessLevel` + +### Phase 2: Historian data source +- New class: `HistorianDataSource` — executes SQL queries against `Runtime.dbo.History` and `AnalogSummaryHistory` +- Connection string configurable in `appsettings.json` +- Parameterized queries only (no dynamic SQL) + +### Phase 3: HistoryRead handler +- Override `HistoryRead` on `LmxNodeManager` +- Implement `HistoryReadRaw` — query `History` view, map results to `HistoryData` +- Implement `HistoryReadProcessed` — query `AnalogSummaryHistory`, map aggregates +- Implement continuation points for large result sets + +### Phase 4: Testing +- Unit tests for quality mapping, tag name resolution, SQL parameter building +- Integration test: create a historized variable, verify `Historizing = true` and `HistoryRead` access level +- Manual test: use OPC UA client to read historical data from deployed server + +## 5. OPC UA CLI Tool — History Command + +Add a `historyread` command to `tools/opcuacli-dotnet/` for manual testing of HDA. + +### Usage + +```bash +# Read raw history (last 24 hours) +dotnet run -- historyread -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=TestMachine_001.TestHistoryValue" + +# Read raw history with explicit time range +dotnet run -- historyread -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=TestMachine_001.TestHistoryValue" --start "2026-03-25" --end "2026-03-30" + +# Read with max values limit +dotnet run -- historyread -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=TestMachine_001.TestHistoryValue" --start "2026-03-25" --end "2026-03-30" --max 100 + +# Read processed/aggregate history (1-hour intervals, Average) +dotnet run -- historyread -u opc.tcp://localhost:4840/LmxOpcUa -n "ns=1;s=TestMachine_001.TestHistoryValue" --start "2026-03-25" --end "2026-03-30" --aggregate Average --interval 3600000 +``` + +### Command Options + +| Flag | Description | +|------|-------------| +| `-u, --url` | OPC UA server endpoint URL (required) | +| `-n, --node` | Node ID to read history for (required) | +| `--start` | Start time, ISO 8601 or date string (default: 24 hours ago) | +| `--end` | End time, ISO 8601 or date string (default: now) | +| `--max` | Maximum number of values to return (default: 1000) | +| `--aggregate` | Aggregate function: Average, Minimum, Maximum, Count (default: none = raw) | +| `--interval` | Processing interval in milliseconds for aggregates (default: 3600000 = 1 hour) | + +### Output Format + +**Raw history:** +``` +History for ns=1;s=TestMachine_001.TestHistoryValue (2026-03-25 → 2026-03-30) + +Timestamp Value Status +2026-03-26T00:44:03.000Z 0 Good +2026-03-26T00:52:17.000Z 3 Good +2026-03-26T01:01:44.000Z 7 Good +2026-03-26T01:09:00.000Z 9 Good + +4 values returned. +``` + +**Aggregate history:** +``` +History for ns=1;s=TestMachine_001.TestHistoryValue (Average, interval=3600000ms) + +Timestamp Value Status +2026-03-26T00:00:00.000Z 4.75 Good + +1 values returned. +``` + +### Implementation + +New file: `tools/opcuacli-dotnet/Commands/HistoryReadCommand.cs` + +Uses the OPC UA client SDK's `Session.ReadRawHistory` and `Session.ReadProcessedHistory` methods (or `HistoryReadAsync` with appropriate `HistoryReadDetails`): + +```csharp +// Raw read +var details = new ReadRawModifiedDetails +{ + StartTime = startTime, + EndTime = endTime, + NumValuesPerNode = (uint)maxValues, + IsReadModified = false, + ReturnBounds = false +}; + +// Processed read +var details = new ReadProcessedDetails +{ + StartTime = startTime, + EndTime = endTime, + ProcessingInterval = intervalMs, + AggregateType = new NodeIdCollection { aggregateNodeId } +}; +``` + +Follow the same pattern as existing commands: use `OpcUaHelper.ConnectAsync()`, parse NodeId, call history read, print results. + +### Continuation Point Handling + +If the server returns `GoodMoreData` with a continuation point, automatically follow up with subsequent requests until all data is retrieved or `--max` is reached. + +### README Update + +Add `historyread` section to `tools/opcuacli-dotnet/README.md` documenting the new command. + +## 6. Files to Modify/Create + +| File | Change | +|---|---| +| `src/.../Domain/GalaxyAttributeInfo.cs` | Add `IsHistorized` property | +| `src/.../GalaxyRepository/GalaxyRepositoryService.cs` | Read `is_historized` column | +| `src/.../OpcUa/LmxNodeManager.cs` | Set `Historizing`/`AccessLevel` for historized nodes; override `HistoryRead` | +| `src/.../Configuration/HistorianConfiguration.cs` | NEW — connection string, query timeout | +| `src/.../Historian/HistorianDataSource.cs` | NEW — SQL queries against Runtime DB | +| `appsettings.json` | Add `Historian` section with connection string | +| `tools/opcuacli-dotnet/Commands/HistoryReadCommand.cs` | NEW — `historyread` CLI command | +| `tools/opcuacli-dotnet/README.md` | Add `historyread` command documentation | + +## 6. Configuration + +```json +{ + "Historian": { + "ConnectionString": "Server=localhost;Database=Runtime;Integrated Security=true;", + "CommandTimeoutSeconds": 30, + "MaxValuesPerRead": 10000 + } +} +``` diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/AppConfiguration.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/AppConfiguration.cs index 64a8f35..878e159 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/AppConfiguration.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/AppConfiguration.cs @@ -24,5 +24,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration /// Gets or sets the embedded dashboard settings used to surface service health to operators. /// public DashboardConfiguration Dashboard { get; set; } = new DashboardConfiguration(); + + /// + /// Gets or sets the Wonderware Historian connection settings used to serve OPC UA historical data. + /// + public HistorianConfiguration Historian { get; set; } = new HistorianConfiguration(); } } diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/HistorianConfiguration.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/HistorianConfiguration.cs new file mode 100644 index 0000000..7a698b4 --- /dev/null +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Configuration/HistorianConfiguration.cs @@ -0,0 +1,23 @@ +namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration +{ + /// + /// Wonderware Historian database configuration for OPC UA historical data access. + /// + public class HistorianConfiguration + { + /// + /// Gets or sets the connection string for the Wonderware Historian Runtime database. + /// + public string ConnectionString { get; set; } = "Server=localhost;Database=Runtime;Integrated Security=true;"; + + /// + /// Gets or sets the SQL command timeout in seconds for historian queries. + /// + public int CommandTimeoutSeconds { get; set; } = 30; + + /// + /// Gets or sets the maximum number of values returned per HistoryRead request. + /// + public int MaxValuesPerRead { get; set; } = 10000; + } +} diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/GalaxyAttributeInfo.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/GalaxyAttributeInfo.cs index 131e33b..d433eb0 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/GalaxyAttributeInfo.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/GalaxyAttributeInfo.cs @@ -54,5 +54,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Domain /// Gets or sets the source classification that explains whether the attribute comes from configuration, calculation, or runtime data. /// public string AttributeSource { get; set; } = ""; + + /// + /// Gets or sets the Galaxy security classification that determines OPC UA write access. + /// 0=FreeAccess, 1=Operate (default), 2=SecuredWrite, 3=VerifiedWrite, 4=Tune, 5=Configure, 6=ViewOnly. + /// + public int SecurityClassification { get; set; } = 1; + + /// + /// Gets or sets a value indicating whether the attribute has a HistoryExtension primitive and is historized by the Wonderware Historian. + /// + public bool IsHistorized { get; set; } + + /// + /// Gets or sets a value indicating whether the attribute has an AlarmExtension primitive and is an alarm. + /// + public bool IsAlarm { get; set; } } } diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/SecurityClassificationMapper.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/SecurityClassificationMapper.cs new file mode 100644 index 0000000..4d3f058 --- /dev/null +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Domain/SecurityClassificationMapper.cs @@ -0,0 +1,28 @@ +namespace ZB.MOM.WW.LmxOpcUa.Host.Domain +{ + /// + /// Maps Galaxy security classification values to OPC UA write access decisions. + /// See gr/data_type_mapping.md for the full mapping table. + /// + public static class SecurityClassificationMapper + { + /// + /// Determines whether an attribute with the given security classification should allow writes. + /// + /// The Galaxy security classification value. + /// for FreeAccess (0), Operate (1), Tune (4), Configure (5); + /// for SecuredWrite (2), VerifiedWrite (3), ViewOnly (6). + public static bool IsWritable(int securityClassification) + { + switch (securityClassification) + { + case 2: // SecuredWrite + case 3: // VerifiedWrite + case 6: // ViewOnly + return false; + default: + return true; + } + } + } +} diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/GalaxyRepository/GalaxyRepositoryService.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/GalaxyRepository/GalaxyRepositoryService.cs index 10bed71..4a04437 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/GalaxyRepository/GalaxyRepositoryService.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/GalaxyRepository/GalaxyRepositoryService.cs @@ -51,62 +51,132 @@ WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) ORDER BY parent_gobject_id, g.tag_name"; private const string AttributesSql = @" -;WITH template_chain AS ( - SELECT g.gobject_id, g.derived_from_gobject_id, 0 AS depth +;WITH deployed_package_chain AS ( + SELECT g.gobject_id, p.package_id, p.derived_from_package_id, 0 AS depth FROM gobject g - WHERE g.is_template = 0 + INNER JOIN package p ON p.package_id = g.deployed_package_id + WHERE g.is_template = 0 AND g.deployed_package_id <> 0 UNION ALL - SELECT tc.gobject_id, t.derived_from_gobject_id, tc.depth + 1 - FROM template_chain tc - INNER JOIN gobject t ON t.gobject_id = tc.derived_from_gobject_id - WHERE tc.derived_from_gobject_id <> 0 AND tc.depth < 10 + SELECT dpc.gobject_id, p.package_id, p.derived_from_package_id, dpc.depth + 1 + FROM deployed_package_chain dpc + INNER JOIN package p ON p.package_id = dpc.derived_from_package_id + WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10 ) -SELECT DISTINCT - g.gobject_id, - g.tag_name, - da.attribute_name, - g.tag_name + '.' + da.attribute_name - + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END - AS full_tag_reference, - da.mx_data_type, - dt.description AS data_type_name, - da.is_array, - CASE WHEN da.is_array = 1 - THEN CONVERT(int, CONVERT(varbinary(2), - SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) - ELSE NULL - END AS array_dimension, - da.mx_attribute_category, - da.security_classification -FROM template_chain tc -INNER JOIN dynamic_attribute da - ON da.gobject_id = tc.derived_from_gobject_id -INNER JOIN gobject g - ON g.gobject_id = tc.gobject_id -INNER JOIN template_definition td - ON td.template_definition_id = g.template_definition_id -LEFT JOIN data_type dt - ON dt.mx_data_type = da.mx_data_type -WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) - AND g.is_template = 0 - AND g.deployed_package_id <> 0 - AND da.attribute_name NOT LIKE '[_]%' - AND da.attribute_name NOT LIKE '%.Description' - AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) -ORDER BY g.tag_name, da.attribute_name"; +SELECT gobject_id, tag_name, attribute_name, full_tag_reference, + mx_data_type, data_type_name, is_array, array_dimension, + mx_attribute_category, security_classification, is_historized, is_alarm +FROM ( + SELECT + dpc.gobject_id, + g.tag_name, + da.attribute_name, + g.tag_name + '.' + da.attribute_name + + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END + AS full_tag_reference, + da.mx_data_type, + dt.description AS data_type_name, + da.is_array, + CASE WHEN da.is_array = 1 + THEN CONVERT(int, CONVERT(varbinary(2), + SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) + ELSE NULL + END AS array_dimension, + da.mx_attribute_category, + da.security_classification, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'HistoryExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_historized, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'AlarmExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_alarm, + ROW_NUMBER() OVER ( + PARTITION BY dpc.gobject_id, da.attribute_name + ORDER BY dpc.depth + ) AS rn + FROM deployed_package_chain dpc + INNER JOIN dynamic_attribute da + ON da.package_id = dpc.package_id + INNER JOIN gobject g + ON g.gobject_id = dpc.gobject_id + INNER JOIN template_definition td + ON td.template_definition_id = g.template_definition_id + LEFT JOIN data_type dt + ON dt.mx_data_type = da.mx_data_type + WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) + AND da.attribute_name NOT LIKE '[_]%' + AND da.attribute_name NOT LIKE '%.Description' + AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) +) ranked +WHERE rn = 1 +ORDER BY tag_name, attribute_name"; private const string ExtendedAttributesSql = @" -;WITH template_chain AS ( - SELECT g.gobject_id, g.derived_from_gobject_id, 0 AS depth +;WITH deployed_package_chain AS ( + SELECT g.gobject_id, p.package_id, p.derived_from_package_id, 0 AS depth FROM gobject g - WHERE g.is_template = 0 + INNER JOIN package p ON p.package_id = g.deployed_package_id + WHERE g.is_template = 0 AND g.deployed_package_id <> 0 UNION ALL - SELECT tc.gobject_id, t.derived_from_gobject_id, tc.depth + 1 - FROM template_chain tc - INNER JOIN gobject t ON t.gobject_id = tc.derived_from_gobject_id - WHERE tc.derived_from_gobject_id <> 0 AND tc.depth < 10 + SELECT dpc.gobject_id, p.package_id, p.derived_from_package_id, dpc.depth + 1 + FROM deployed_package_chain dpc + INNER JOIN package p ON p.package_id = dpc.derived_from_package_id + WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10 +), +ranked_dynamic AS ( + SELECT + dpc.gobject_id, + g.tag_name, + da.attribute_name, + g.tag_name + '.' + da.attribute_name + + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END + AS full_tag_reference, + da.mx_data_type, + dt.description AS data_type_name, + da.is_array, + CASE WHEN da.is_array = 1 + THEN CONVERT(int, CONVERT(varbinary(2), + SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) + ELSE NULL + END AS array_dimension, + da.mx_attribute_category, + da.security_classification, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'HistoryExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_historized, + CASE WHEN EXISTS ( + SELECT 1 FROM deployed_package_chain dpc2 + INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name + INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'AlarmExtension' + WHERE dpc2.gobject_id = dpc.gobject_id + ) THEN 1 ELSE 0 END AS is_alarm, + ROW_NUMBER() OVER ( + PARTITION BY dpc.gobject_id, da.attribute_name + ORDER BY dpc.depth + ) AS rn + FROM deployed_package_chain dpc + INNER JOIN dynamic_attribute da + ON da.package_id = dpc.package_id + INNER JOIN gobject g + ON g.gobject_id = dpc.gobject_id + INNER JOIN template_definition td + ON td.template_definition_id = g.template_definition_id + LEFT JOIN data_type dt + ON dt.mx_data_type = da.mx_data_type + WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) + AND da.attribute_name NOT LIKE '[_]%' + AND da.attribute_name NOT LIKE '%.Description' + AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) ) -SELECT DISTINCT +SELECT gobject_id, tag_name, primitive_name, @@ -118,6 +188,8 @@ SELECT DISTINCT array_dimension, mx_attribute_category, security_classification, + is_historized, + is_alarm, attribute_source FROM ( SELECT @@ -140,6 +212,8 @@ FROM ( END AS array_dimension, ad.mx_attribute_category, ad.security_classification, + CAST(0 AS int) AS is_historized, + CAST(0 AS int) AS is_alarm, 'primitive' AS attribute_source FROM gobject g INNER JOIN instance i @@ -148,7 +222,7 @@ FROM ( ON td.template_definition_id = g.template_definition_id AND td.runtime_clsid <> '{00000000-0000-0000-0000-000000000000}' INNER JOIN package p - ON p.package_id = g.checked_in_package_id + ON p.package_id = g.deployed_package_id INNER JOIN primitive_instance pi ON pi.package_id = p.package_id AND pi.property_bitmask & 0x10 <> 0x10 @@ -165,39 +239,22 @@ FROM ( UNION ALL SELECT - g.gobject_id, - g.tag_name, + gobject_id, + tag_name, '' AS primitive_name, - da.attribute_name, - g.tag_name + '.' + da.attribute_name - + CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END - AS full_tag_reference, - da.mx_data_type, - dt.description AS data_type_name, - da.is_array, - CASE WHEN da.is_array = 1 - THEN CONVERT(int, CONVERT(varbinary(2), - SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2)) - ELSE NULL - END AS array_dimension, - da.mx_attribute_category, - da.security_classification, + attribute_name, + full_tag_reference, + mx_data_type, + data_type_name, + is_array, + array_dimension, + mx_attribute_category, + security_classification, + is_historized, + is_alarm, 'dynamic' AS attribute_source - FROM template_chain tc - INNER JOIN dynamic_attribute da - ON da.gobject_id = tc.derived_from_gobject_id - INNER JOIN gobject g - ON g.gobject_id = tc.gobject_id - INNER JOIN template_definition td - ON td.template_definition_id = g.template_definition_id - LEFT JOIN data_type dt - ON dt.mx_data_type = da.mx_data_type - WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26) - AND g.is_template = 0 - AND g.deployed_package_id <> 0 - AND da.attribute_name NOT LIKE '[_]%' - AND da.attribute_name NOT LIKE '%.Description' - AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24) + FROM ranked_dynamic + WHERE rn = 1 ) all_attributes ORDER BY tag_name, primitive_name, attribute_name"; @@ -279,9 +336,10 @@ ORDER BY tag_name, primitive_name, attribute_name"; } /// - /// Reads a row from the standard attributes query (10 columns). + /// Reads a row from the standard attributes query (12 columns). /// Columns: gobject_id, tag_name, attribute_name, full_tag_reference, mx_data_type, - /// data_type_name, is_array, array_dimension, mx_attribute_category, security_classification + /// data_type_name, is_array, array_dimension, mx_attribute_category, + /// security_classification, is_historized, is_alarm /// private static GalaxyAttributeInfo ReadStandardAttribute(SqlDataReader reader) { @@ -294,15 +352,18 @@ ORDER BY tag_name, primitive_name, attribute_name"; MxDataType = Convert.ToInt32(reader.GetValue(4)), DataTypeName = reader.IsDBNull(5) ? "" : reader.GetString(5), IsArray = Convert.ToBoolean(reader.GetValue(6)), - ArrayDimension = reader.IsDBNull(7) ? null : (int?)Convert.ToInt32(reader.GetValue(7)) + ArrayDimension = reader.IsDBNull(7) ? null : (int?)Convert.ToInt32(reader.GetValue(7)), + SecurityClassification = Convert.ToInt32(reader.GetValue(9)), + IsHistorized = Convert.ToInt32(reader.GetValue(10)) == 1, + IsAlarm = Convert.ToInt32(reader.GetValue(11)) == 1 }; } /// - /// Reads a row from the extended attributes query (12 columns). + /// Reads a row from the extended attributes query (14 columns). /// Columns: gobject_id, tag_name, primitive_name, attribute_name, full_tag_reference, /// mx_data_type, data_type_name, is_array, array_dimension, - /// mx_attribute_category, security_classification, attribute_source + /// mx_attribute_category, security_classification, is_historized, is_alarm, attribute_source /// private static GalaxyAttributeInfo ReadExtendedAttribute(SqlDataReader reader) { @@ -317,7 +378,10 @@ ORDER BY tag_name, primitive_name, attribute_name"; DataTypeName = reader.IsDBNull(6) ? "" : reader.GetString(6), IsArray = Convert.ToBoolean(reader.GetValue(7)), ArrayDimension = reader.IsDBNull(8) ? null : (int?)Convert.ToInt32(reader.GetValue(8)), - AttributeSource = reader.IsDBNull(11) ? "" : reader.GetString(11) + SecurityClassification = Convert.ToInt32(reader.GetValue(10)), + IsHistorized = Convert.ToInt32(reader.GetValue(11)) == 1, + IsAlarm = Convert.ToInt32(reader.GetValue(12)) == 1, + AttributeSource = reader.IsDBNull(13) ? "" : reader.GetString(13) }; } diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Historian/HistorianDataSource.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Historian/HistorianDataSource.cs new file mode 100644 index 0000000..3591412 --- /dev/null +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Historian/HistorianDataSource.cs @@ -0,0 +1,156 @@ +using System; +using System.Collections.Generic; +using System.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; +using Opc.Ua; +using Serilog; +using ZB.MOM.WW.LmxOpcUa.Host.Configuration; + +namespace ZB.MOM.WW.LmxOpcUa.Host.Historian +{ + /// + /// Reads historical data from the Wonderware Historian Runtime database. + /// + public class HistorianDataSource + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly HistorianConfiguration _config; + + public HistorianDataSource(HistorianConfiguration config) + { + _config = config; + } + + /// + /// Reads raw historical values for a tag from the Historian. + /// + public async Task> ReadRawAsync( + string tagName, DateTime startTime, DateTime endTime, int maxValues, + CancellationToken ct = default) + { + var results = new List(); + var sql = maxValues > 0 + ? "SELECT TOP (@MaxValues) DateTime, Value, vValue, Quality FROM Runtime.dbo.History WHERE TagName = @TagName AND DateTime >= @StartTime AND DateTime <= @EndTime ORDER BY DateTime" + : "SELECT DateTime, Value, vValue, Quality FROM Runtime.dbo.History WHERE TagName = @TagName AND DateTime >= @StartTime AND DateTime <= @EndTime ORDER BY DateTime"; + + using var conn = new SqlConnection(_config.ConnectionString); + await conn.OpenAsync(ct); + + using var cmd = new SqlCommand(sql, conn) { CommandTimeout = _config.CommandTimeoutSeconds }; + cmd.Parameters.AddWithValue("@TagName", tagName); + cmd.Parameters.AddWithValue("@StartTime", startTime); + cmd.Parameters.AddWithValue("@EndTime", endTime); + if (maxValues > 0) + cmd.Parameters.AddWithValue("@MaxValues", maxValues); + + using var reader = await cmd.ExecuteReaderAsync(ct); + while (await reader.ReadAsync(ct)) + { + var timestamp = reader.GetDateTime(0); + object? value; + if (!reader.IsDBNull(1)) + value = reader.GetDouble(1); + else if (!reader.IsDBNull(2)) + value = reader.GetString(2); + else + value = null; + var quality = reader.IsDBNull(3) ? (byte)0 : Convert.ToByte(reader.GetValue(3)); + + results.Add(new DataValue + { + Value = new Variant(value), + SourceTimestamp = timestamp, + ServerTimestamp = timestamp, + StatusCode = MapQuality(quality) + }); + } + + Log.Debug("HistoryRead raw: {Tag} returned {Count} values ({Start} to {End})", + tagName, results.Count, startTime, endTime); + + return results; + } + + /// + /// Reads aggregate historical values for a tag from the Historian. + /// + public async Task> ReadAggregateAsync( + string tagName, DateTime startTime, DateTime endTime, + double intervalMs, string aggregateColumn, + CancellationToken ct = default) + { + var results = new List(); + var sql = $"SELECT StartDateTime, [{aggregateColumn}] FROM Runtime.dbo.AnalogSummaryHistory " + + "WHERE TagName = @TagName AND StartDateTime >= @StartTime AND StartDateTime <= @EndTime " + + "AND wwResolution = @Resolution ORDER BY StartDateTime"; + + using var conn = new SqlConnection(_config.ConnectionString); + await conn.OpenAsync(ct); + + using var cmd = new SqlCommand(sql, conn) { CommandTimeout = _config.CommandTimeoutSeconds }; + cmd.Parameters.AddWithValue("@TagName", tagName); + cmd.Parameters.AddWithValue("@StartTime", startTime); + cmd.Parameters.AddWithValue("@EndTime", endTime); + cmd.Parameters.AddWithValue("@Resolution", (int)intervalMs); + + using var reader = await cmd.ExecuteReaderAsync(ct); + while (await reader.ReadAsync(ct)) + { + var timestamp = reader.GetDateTime(0); + var value = reader.IsDBNull(1) ? (object?)null : reader.GetDouble(1); + + results.Add(new DataValue + { + Value = new Variant(value), + SourceTimestamp = timestamp, + ServerTimestamp = timestamp, + StatusCode = value != null ? StatusCodes.Good : StatusCodes.BadNoData + }); + } + + Log.Debug("HistoryRead aggregate ({Aggregate}): {Tag} returned {Count} values", + aggregateColumn, tagName, results.Count); + + return results; + } + + /// + /// Maps Wonderware Historian quality codes to OPC UA StatusCodes. + /// + public static StatusCode MapQuality(byte quality) + { + if (quality == 0) + return StatusCodes.Good; + if (quality == 1) + return StatusCodes.Bad; + if (quality >= 128) + return StatusCodes.Uncertain; + return StatusCodes.Bad; + } + + /// + /// Maps an OPC UA aggregate NodeId to the corresponding Historian column name. + /// Returns null if the aggregate is not supported. + /// + public static string? MapAggregateToColumn(NodeId aggregateId) + { + if (aggregateId == ObjectIds.AggregateFunction_Average) + return "Average"; + if (aggregateId == ObjectIds.AggregateFunction_Minimum) + return "Minimum"; + if (aggregateId == ObjectIds.AggregateFunction_Maximum) + return "Maximum"; + if (aggregateId == ObjectIds.AggregateFunction_Count) + return "ValueCount"; + if (aggregateId == ObjectIds.AggregateFunction_Start) + return "First"; + if (aggregateId == ObjectIds.AggregateFunction_End) + return "Last"; + if (aggregateId == ObjectIds.AggregateFunction_StandardDeviationPopulation) + return "StdDev"; + return null; + } + } +} diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/AddressSpaceBuilder.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/AddressSpaceBuilder.cs index e12f442..5a05c86 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/AddressSpaceBuilder.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/AddressSpaceBuilder.cs @@ -84,6 +84,27 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa /// Gets or sets the declared array length when the attribute is a fixed-size array. /// public int? ArrayDimension { get; set; } + + /// + /// Gets or sets the primitive name that groups the attribute under a sub-object node. + /// Empty for root-level attributes. + /// + public string PrimitiveName { get; set; } = ""; + + /// + /// Gets or sets the Galaxy security classification that determines OPC UA write access. + /// + public int SecurityClassification { get; set; } = 1; + + /// + /// Gets or sets a value indicating whether the attribute is historized. + /// + public bool IsHistorized { get; set; } + + /// + /// Gets or sets a value indicating whether the attribute is an alarm. + /// + public bool IsAlarm { get; set; } } /// @@ -175,7 +196,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa FullTagReference = attr.FullTagReference, MxDataType = attr.MxDataType, IsArray = attr.IsArray, - ArrayDimension = attr.ArrayDimension + ArrayDimension = attr.ArrayDimension, + PrimitiveName = attr.PrimitiveName ?? "", + SecurityClassification = attr.SecurityClassification, + IsHistorized = attr.IsHistorized, + IsAlarm = attr.IsAlarm }); model.NodeIdToTagReference[GetNodeIdentifier(attr)] = attr.FullTagReference; diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs index 2ea77dc..1d66feb 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs @@ -1,10 +1,13 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using Opc.Ua; using Opc.Ua.Server; using Serilog; using ZB.MOM.WW.LmxOpcUa.Host.Domain; +using ZB.MOM.WW.LmxOpcUa.Host.Historian; using ZB.MOM.WW.LmxOpcUa.Host.Metrics; namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa @@ -19,6 +22,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private readonly IMxAccessClient _mxAccessClient; private readonly PerformanceMetrics _metrics; + private readonly HistorianDataSource? _historianDataSource; private readonly string _namespaceUri; // NodeId → full_tag_reference for read/write resolution @@ -32,6 +36,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private readonly object _lock = new object(); private IDictionary>? _externalReferences; + // Data change dispatch queue: decouples MXAccess STA callbacks from OPC UA framework Lock + private readonly ConcurrentDictionary _pendingDataChanges = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly AutoResetEvent _dataChangeSignal = new AutoResetEvent(false); + private Thread? _dispatchThread; + private volatile bool _dispatchRunning; + + // Dispatch queue metrics + private long _totalMxChangeEvents; + private long _lastReportedMxChangeEvents; + private long _totalDispatchBatchSize; + private long _dispatchCycleCount; + private DateTime _lastMetricsReportTime = DateTime.UtcNow; + private double _lastEventsPerSecond; + private double _lastAvgBatchSize; + private sealed class TagMetadata { public int MxDataType { get; set; } @@ -39,6 +58,22 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa public int? ArrayDimension { get; set; } } + // Alarm tracking: maps InAlarm tag reference → alarm source info + private readonly Dictionary _alarmInAlarmTags = new Dictionary(StringComparer.OrdinalIgnoreCase); + + private sealed class AlarmInfo + { + public string SourceTagReference { get; set; } = ""; + public NodeId SourceNodeId { get; set; } = NodeId.Null; + public string SourceName { get; set; } = ""; + public bool LastInAlarm { get; set; } + public AlarmConditionState? ConditionNode { get; set; } + public string PriorityTagReference { get; set; } = ""; + public string DescAttrNameTagReference { get; set; } = ""; + public ushort CachedSeverity { get; set; } + public string CachedMessage { get; set; } = ""; + } + /// /// Gets the mapping from OPC UA node identifiers to the Galaxy tag references used for runtime I/O. /// @@ -54,6 +89,26 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa /// public int ObjectNodeCount { get; private set; } + /// + /// Gets the total number of MXAccess data change events received since startup. + /// + public long TotalMxChangeEvents => Interlocked.Read(ref _totalMxChangeEvents); + + /// + /// Gets the number of items currently waiting in the dispatch queue. + /// + public int PendingDataChangeCount => _pendingDataChanges.Count; + + /// + /// Gets the most recently computed MXAccess data change events per second. + /// + public double MxChangeEventsPerSecond => _lastEventsPerSecond; + + /// + /// Gets the most recently computed average dispatch batch size (proxy for queue depth under load). + /// + public double AverageDispatchBatchSize => _lastAvgBatchSize; + /// /// Initializes a new node manager for the Galaxy-backed OPC UA namespace. /// @@ -67,15 +122,20 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa ApplicationConfiguration configuration, string namespaceUri, IMxAccessClient mxAccessClient, - PerformanceMetrics metrics) + PerformanceMetrics metrics, + HistorianDataSource? historianDataSource = null) : base(server, configuration, namespaceUri) { _namespaceUri = namespaceUri; _mxAccessClient = mxAccessClient; _metrics = metrics; + _historianDataSource = historianDataSource; // Wire up data change delivery _mxAccessClient.OnTagValueChanged += OnMxAccessDataChange; + + // Start background dispatch thread + StartDispatchThread(); } /// @@ -100,6 +160,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa _nodeIdToTagReference.Clear(); _tagToVariableNode.Clear(); _tagMetadata.Clear(); + _alarmInAlarmTags.Clear(); VariableNodeCount = 0; ObjectNodeCount = 0; @@ -111,9 +172,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa .GroupBy(a => a.GobjectId) .ToDictionary(g => g.Key, g => g.ToList()); - // Root folder + // Root folder — enable events so alarm events propagate to clients subscribed at root var rootFolder = CreateFolder(null, "ZB", "ZB"); rootFolder.NodeId = new NodeId("ZB", NamespaceIndex); + rootFolder.EventNotifier = EventNotifiers.SubscribeToEvents; rootFolder.AddReference(ReferenceTypeIds.Organizes, true, ObjectIds.ObjectsFolder); AddPredefinedNode(SystemContext, rootFolder); @@ -161,18 +223,201 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa // Create variable nodes for this object's attributes if (attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) { - foreach (var attr in objAttrs) + // Group by primitive_name: empty = direct child, non-empty = sub-object + var byPrimitive = objAttrs + .GroupBy(a => a.PrimitiveName ?? "") + .OrderBy(g => g.Key); + + // Collect primitive group names so we know which direct attributes have children + var primitiveGroupNames = new HashSet( + byPrimitive.Select(g => g.Key).Where(k => !string.IsNullOrEmpty(k)), + StringComparer.OrdinalIgnoreCase); + + // Track variable nodes created for direct attributes that also have primitive children + var variableNodes = new Dictionary(StringComparer.OrdinalIgnoreCase); + + // First pass: create direct (root-level) attribute variables + var directGroup = byPrimitive.FirstOrDefault(g => string.IsNullOrEmpty(g.Key)); + if (directGroup != null) { - CreateAttributeVariable(node, attr); + foreach (var attr in directGroup) + { + var variable = CreateAttributeVariable(node, attr); + if (primitiveGroupNames.Contains(attr.AttributeName)) + { + variableNodes[attr.AttributeName] = variable; + } + } + } + + // Second pass: add primitive child attributes under the matching variable node + foreach (var group in byPrimitive) + { + if (string.IsNullOrEmpty(group.Key)) + continue; + + NodeState parentForAttrs; + if (variableNodes.TryGetValue(group.Key, out var existingVariable)) + { + // Merge: use the existing variable node as parent + parentForAttrs = existingVariable; + } + else + { + // No matching dynamic attribute — create an object node + var primNode = CreateObject(node, group.Key, group.Key); + primNode.NodeId = new NodeId(obj.TagName + "." + group.Key, NamespaceIndex); + AddPredefinedNode(SystemContext, primNode); + parentForAttrs = primNode; + } + + foreach (var attr in group) + { + CreateAttributeVariable(parentForAttrs, attr); + } } } } - Log.Information("Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references", - ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count); + // Build alarm tracking: create AlarmConditionState for each alarm attribute + foreach (var obj in sorted) + { + if (obj.IsArea) continue; + if (!attrsByObject.TryGetValue(obj.GobjectId, out var objAttrs)) continue; + + var hasAlarms = false; + var alarmAttrs = objAttrs.Where(a => a.IsAlarm && string.IsNullOrEmpty(a.PrimitiveName)).ToList(); + foreach (var alarmAttr in alarmAttrs) + { + var inAlarmTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']') + ".InAlarm"; + if (!_tagToVariableNode.ContainsKey(inAlarmTagRef)) + continue; + + var alarmNodeIdStr = alarmAttr.FullTagReference.EndsWith("[]") + ? alarmAttr.FullTagReference.Substring(0, alarmAttr.FullTagReference.Length - 2) + : alarmAttr.FullTagReference; + + // Find the source variable node for the alarm + _tagToVariableNode.TryGetValue(alarmAttr.FullTagReference, out var sourceVariable); + var sourceNodeId = new NodeId(alarmNodeIdStr, NamespaceIndex); + + // Create AlarmConditionState attached to the source variable + var conditionNodeId = new NodeId(alarmNodeIdStr + ".Condition", NamespaceIndex); + var condition = new AlarmConditionState(sourceVariable); + condition.Create(SystemContext, conditionNodeId, + new QualifiedName(alarmAttr.AttributeName + "Alarm", NamespaceIndex), + new LocalizedText("en", alarmAttr.AttributeName + " Alarm"), + true); + condition.SourceNode.Value = sourceNodeId; + condition.SourceName.Value = alarmAttr.AttributeName; + condition.ConditionName.Value = alarmAttr.AttributeName; + condition.AutoReportStateChanges = true; + + // Set initial state: enabled, inactive, acknowledged + condition.SetEnableState(SystemContext, true); + condition.SetActiveState(SystemContext, false); + condition.SetAcknowledgedState(SystemContext, true); + condition.SetSeverity(SystemContext, EventSeverity.Medium); + condition.Retain.Value = false; + condition.OnReportEvent = (context, node, e) => Server.ReportEvent(context, e); + + // Add HasCondition reference from source to condition + if (sourceVariable != null) + { + sourceVariable.AddReference(ReferenceTypeIds.HasCondition, false, conditionNodeId); + condition.AddReference(ReferenceTypeIds.HasCondition, true, sourceNodeId); + } + + AddPredefinedNode(SystemContext, condition); + + var baseTagRef = alarmAttr.FullTagReference.TrimEnd('[', ']'); + _alarmInAlarmTags[inAlarmTagRef] = new AlarmInfo + { + SourceTagReference = alarmAttr.FullTagReference, + SourceNodeId = sourceNodeId, + SourceName = alarmAttr.AttributeName, + ConditionNode = condition, + PriorityTagReference = baseTagRef + ".Priority", + DescAttrNameTagReference = baseTagRef + ".DescAttrName" + }; + hasAlarms = true; + } + + // Enable EventNotifier on object nodes that contain alarms + if (hasAlarms && nodeMap.TryGetValue(obj.GobjectId, out var objNode)) + { + if (objNode is BaseObjectState objState) + objState.EventNotifier = EventNotifiers.SubscribeToEvents; + else if (objNode is FolderState folderState) + folderState.EventNotifier = EventNotifiers.SubscribeToEvents; + } + } + + // Auto-subscribe to InAlarm tags so we detect alarm transitions + SubscribeAlarmTags(); + + Log.Information("Address space built: {Objects} objects, {Variables} variables, {Mappings} tag references, {Alarms} alarm tags", + ObjectNodeCount, VariableNodeCount, _nodeIdToTagReference.Count, _alarmInAlarmTags.Count); } } + private void SubscribeAlarmTags() + { + foreach (var kvp in _alarmInAlarmTags) + { + // Subscribe to InAlarm, Priority, and DescAttrName for each alarm + var tagsToSubscribe = new[] { kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference }; + foreach (var tag in tagsToSubscribe) + { + if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag)) + continue; + try + { + _mxAccessClient.SubscribeAsync(tag, (_, _) => { }); + } + catch (Exception ex) + { + Log.Warning(ex, "Failed to auto-subscribe to alarm tag {Tag}", tag); + } + } + } + } + + private void ReportAlarmEvent(AlarmInfo info, bool active) + { + var condition = info.ConditionNode; + if (condition == null) + return; + + ushort severity = info.CachedSeverity; + string message = active + ? (!string.IsNullOrEmpty(info.CachedMessage) ? info.CachedMessage : $"Alarm active: {info.SourceName}") + : $"Alarm cleared: {info.SourceName}"; + + condition.SetActiveState(SystemContext, active); + condition.Message.Value = new LocalizedText("en", message); + condition.SetSeverity(SystemContext, (EventSeverity)severity); + + // Retain while active or unacknowledged + condition.Retain.Value = active || (condition.AckedState?.Id?.Value == false); + + // Reset acknowledged state when alarm activates + if (active) + condition.SetAcknowledgedState(SystemContext, false); + + // Report through the source node hierarchy so events reach subscribers on parent objects + if (_tagToVariableNode.TryGetValue(info.SourceTagReference, out var sourceVar) && sourceVar.Parent != null) + { + sourceVar.Parent.ReportEvent(SystemContext, condition); + } + + // Also report to Server node for clients subscribed at server level + Server.ReportEvent(SystemContext, condition); + + Log.Information("Alarm {State}: {Source} (Severity={Severity}, Message={Message})", + active ? "ACTIVE" : "CLEARED", info.SourceName, severity, message); + } + /// /// Rebuilds the address space, removing old nodes and creating new ones. (OPC-010) /// @@ -197,6 +442,19 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa } } + // Unsubscribe auto-subscribed alarm tags + foreach (var kvp in _alarmInAlarmTags) + { + foreach (var tag in new[] { kvp.Key, kvp.Value.PriorityTagReference, kvp.Value.DescAttrNameTagReference }) + { + if (!string.IsNullOrEmpty(tag)) + { + try { _mxAccessClient.UnsubscribeAsync(tag).GetAwaiter().GetResult(); } + catch { /* ignore */ } + } + } + } + // Remove all predefined nodes foreach (var nodeId in PredefinedNodes.Keys.ToList()) { @@ -260,7 +518,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa return result; } - private void CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr) + private BaseDataVariableState CreateAttributeVariable(NodeState parent, GalaxyAttributeInfo attr) { var opcUaDataTypeId = MxDataTypeMapper.MapToOpcUaDataType(attr.MxDataType); var variable = CreateVariable(parent, attr.AttributeName, attr.AttributeName, new NodeId(opcUaDataTypeId), @@ -274,8 +532,16 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa variable.ArrayDimensions = new ReadOnlyList(new List { (uint)attr.ArrayDimension.Value }); } - variable.AccessLevel = AccessLevels.CurrentReadOrWrite; - variable.UserAccessLevel = AccessLevels.CurrentReadOrWrite; + var accessLevel = SecurityClassificationMapper.IsWritable(attr.SecurityClassification) + ? AccessLevels.CurrentReadOrWrite + : AccessLevels.CurrentRead; + if (attr.IsHistorized) + { + accessLevel |= AccessLevels.HistoryRead; + } + variable.AccessLevel = accessLevel; + variable.UserAccessLevel = accessLevel; + variable.Historizing = attr.IsHistorized; variable.Value = NormalizePublishedValue(attr.FullTagReference, null); variable.StatusCode = StatusCodes.BadWaitingForInitialData; variable.Timestamp = DateTime.UtcNow; @@ -290,6 +556,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa ArrayDimension = attr.ArrayDimension }; VariableNodeCount++; + return variable; } private static string GetNodeIdentifier(GalaxyAttributeInfo attr) @@ -412,6 +679,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa if (nodesToWrite[i].AttributeId != Attributes.Value) continue; + // Skip if base rejected due to access level (read-only node) + if (errors[i] != null && errors[i].StatusCode == StatusCodes.BadNotWritable) + continue; + var nodeId = nodesToWrite[i].NodeId; if (nodeId.NamespaceIndex != NamespaceIndex) continue; @@ -551,21 +822,167 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa #endregion + #region Condition Refresh + + /// + /// Reports all active retained alarm conditions during a condition refresh. + /// + public override ServiceResult ConditionRefresh(OperationContext context, IList monitoredItems) + { + foreach (var kvp in _alarmInAlarmTags) + { + var info = kvp.Value; + if (info.ConditionNode == null || info.ConditionNode.Retain?.Value != true) + continue; + + foreach (var item in monitoredItems) + { + item.QueueEvent(info.ConditionNode); + } + } + + return ServiceResult.Good; + } + + #endregion + + #region HistoryRead + + /// + protected override void HistoryReadRawModified( + ServerSystemContext context, + ReadRawModifiedDetails details, + TimestampsToReturn timestampsToReturn, + IList nodesToRead, + IList results, + IList errors, + List nodesToProcess, + IDictionary cache) + { + foreach (var handle in nodesToProcess) + { + var idx = handle.Index; + var nodeIdStr = handle.NodeId?.Identifier as string; + if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) + { + errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown); + continue; + } + + if (_historianDataSource == null) + { + errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported); + continue; + } + + try + { + var maxValues = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0; + var dataValues = _historianDataSource.ReadRawAsync( + tagRef, details.StartTime, details.EndTime, maxValues) + .GetAwaiter().GetResult(); + + var historyData = new HistoryData(); + historyData.DataValues.AddRange(dataValues); + + results[idx] = new HistoryReadResult + { + StatusCode = StatusCodes.Good, + HistoryData = new ExtensionObject(historyData) + }; + errors[idx] = ServiceResult.Good; + } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead raw failed for {TagRef}", tagRef); + errors[idx] = new ServiceResult(StatusCodes.BadInternalError); + } + } + } + + /// + protected override void HistoryReadProcessed( + ServerSystemContext context, + ReadProcessedDetails details, + TimestampsToReturn timestampsToReturn, + IList nodesToRead, + IList results, + IList errors, + List nodesToProcess, + IDictionary cache) + { + foreach (var handle in nodesToProcess) + { + var idx = handle.Index; + var nodeIdStr = handle.NodeId?.Identifier as string; + if (nodeIdStr == null || !_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) + { + errors[idx] = new ServiceResult(StatusCodes.BadNodeIdUnknown); + continue; + } + + if (_historianDataSource == null) + { + errors[idx] = new ServiceResult(StatusCodes.BadHistoryOperationUnsupported); + continue; + } + + if (details.AggregateType == null || details.AggregateType.Count == 0) + { + errors[idx] = new ServiceResult(StatusCodes.BadAggregateListMismatch); + continue; + } + + var aggregateId = details.AggregateType[idx < details.AggregateType.Count ? idx : 0]; + var column = HistorianDataSource.MapAggregateToColumn(aggregateId); + if (column == null) + { + errors[idx] = new ServiceResult(StatusCodes.BadAggregateNotSupported); + continue; + } + + try + { + var dataValues = _historianDataSource.ReadAggregateAsync( + tagRef, details.StartTime, details.EndTime, + details.ProcessingInterval, column) + .GetAwaiter().GetResult(); + + var historyData = new HistoryData(); + historyData.DataValues.AddRange(dataValues); + + results[idx] = new HistoryReadResult + { + StatusCode = StatusCodes.Good, + HistoryData = new ExtensionObject(historyData) + }; + errors[idx] = ServiceResult.Good; + } + catch (Exception ex) + { + Log.Warning(ex, "HistoryRead processed failed for {TagRef}", tagRef); + errors[idx] = new ServiceResult(StatusCodes.BadInternalError); + } + } + } + + #endregion + #region Subscription Delivery /// - /// Called by the OPC UA framework after monitored items are created on nodes in our namespace. - /// Triggers ref-counted MXAccess subscriptions for the underlying tags. + /// Called by the OPC UA framework during monitored item creation. + /// Triggers ref-counted MXAccess subscriptions early so the runtime value + /// can arrive before the initial publish to the client. /// /// - protected override void OnCreateMonitoredItemsComplete(ServerSystemContext context, IList monitoredItems) + protected override void OnMonitoredItemCreated(ServerSystemContext context, NodeHandle handle, MonitoredItem monitoredItem) { - foreach (var item in monitoredItems) - { - var nodeIdStr = GetNodeIdString(item); - if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) - SubscribeTag(tagRef); - } + base.OnMonitoredItemCreated(context, handle, monitoredItem); + + var nodeIdStr = handle?.NodeId?.Identifier as string; + if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) + SubscribeTag(tagRef); } /// @@ -583,6 +1000,24 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa } } + /// + /// Called by the OPC UA framework after monitored items are transferred to a new session. + /// Rebuilds MXAccess subscription bookkeeping when transferred items arrive without local in-memory state. + /// + /// + protected override void OnMonitoredItemsTransferred(ServerSystemContext context, IList monitoredItems) + { + base.OnMonitoredItemsTransferred(context, monitoredItems); + + var transferredTagRefs = monitoredItems + .Select(GetNodeIdString) + .Where(nodeIdStr => nodeIdStr != null && _nodeIdToTagReference.ContainsKey(nodeIdStr)) + .Select(nodeIdStr => _nodeIdToTagReference[nodeIdStr!]) + .ToList(); + + RestoreTransferredSubscriptions(transferredTagRefs); + } + private static string? GetNodeIdString(IMonitoredItem item) { if (item.ManagerHandle is NodeState node) @@ -633,23 +1068,206 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa } } + /// + /// Rebuilds subscription reference counts for monitored items that were transferred by the OPC UA stack. + /// Existing in-memory bookkeeping is preserved to avoid double-counting normal in-process transfers. + /// + /// The Galaxy tag references represented by the transferred monitored items. + internal void RestoreTransferredSubscriptions(IEnumerable fullTagReferences) + { + var transferredCounts = fullTagReferences + .GroupBy(tagRef => tagRef, StringComparer.OrdinalIgnoreCase) + .ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase); + + foreach (var kvp in transferredCounts) + { + lock (_lock) + { + if (_subscriptionRefCounts.ContainsKey(kvp.Key)) + continue; + + _subscriptionRefCounts[kvp.Key] = kvp.Value; + } + + _ = _mxAccessClient.SubscribeAsync(kvp.Key, (_, _) => { }); + } + } + private void OnMxAccessDataChange(string address, Vtq vtq) { - if (_tagToVariableNode.TryGetValue(address, out var variable)) + Interlocked.Increment(ref _totalMxChangeEvents); + _pendingDataChanges[address] = vtq; + _dataChangeSignal.Set(); + } + + #endregion + + #region Data Change Dispatch + + private void StartDispatchThread() + { + _dispatchRunning = true; + _dispatchThread = new Thread(DispatchLoop) { - try + Name = "OpcUaDataChangeDispatch", + IsBackground = true + }; + _dispatchThread.Start(); + } + + private void StopDispatchThread() + { + _dispatchRunning = false; + _dataChangeSignal.Set(); + _dispatchThread?.Join(TimeSpan.FromSeconds(5)); + } + + private void DispatchLoop() + { + Log.Information("Data change dispatch thread started"); + + while (_dispatchRunning) + { + _dataChangeSignal.WaitOne(TimeSpan.FromMilliseconds(100)); + + if (!_dispatchRunning) break; + + var keys = _pendingDataChanges.Keys.ToList(); + if (keys.Count == 0) { - var dataValue = CreatePublishedDataValue(address, vtq); - variable.Value = dataValue.Value; - variable.StatusCode = dataValue.StatusCode; - variable.Timestamp = dataValue.SourceTimestamp; - variable.ClearChangeMasks(SystemContext, false); + ReportDispatchMetricsIfDue(); + continue; } - catch (Exception ex) + + // Prepare updates outside the Lock — no IO, just value conversion + var updates = new List<(BaseDataVariableState variable, DataValue dataValue)>(keys.Count); + var pendingAlarmEvents = new List<(AlarmInfo info, bool active)>(); + + foreach (var address in keys) { - Log.Warning(ex, "Error updating variable node for {Address}", address); + if (_pendingDataChanges.TryRemove(address, out var vtq)) + { + if (_tagToVariableNode.TryGetValue(address, out var variable)) + { + try + { + var dataValue = CreatePublishedDataValue(address, vtq); + updates.Add((variable, dataValue)); + } + catch (Exception ex) + { + Log.Warning(ex, "Error preparing data change for {Address}", address); + } + } + + // Check for alarm InAlarm transitions + if (_alarmInAlarmTags.TryGetValue(address, out var alarmInfo)) + { + var newInAlarm = vtq.Value is true || vtq.Value is 1 || (vtq.Value is int intVal && intVal != 0); + if (newInAlarm != alarmInfo.LastInAlarm) + { + alarmInfo.LastInAlarm = newInAlarm; + + // Read Priority and DescAttrName via MXAccess (outside Lock, safe here) + if (newInAlarm) + { + try + { + var pVtq = _mxAccessClient.ReadAsync(alarmInfo.PriorityTagReference).GetAwaiter().GetResult(); + if (pVtq.Value is int ip) alarmInfo.CachedSeverity = (ushort)System.Math.Min(System.Math.Max(ip, 1), 1000); + else if (pVtq.Value is short sp) alarmInfo.CachedSeverity = (ushort)System.Math.Min(System.Math.Max((int)sp, 1), 1000); + } + catch { /* keep previous */ } + + try + { + var dVtq = _mxAccessClient.ReadAsync(alarmInfo.DescAttrNameTagReference).GetAwaiter().GetResult(); + if (dVtq.Value is string desc && !string.IsNullOrEmpty(desc)) + alarmInfo.CachedMessage = desc; + } + catch { /* keep previous */ } + } + + pendingAlarmEvents.Add((alarmInfo, newInAlarm)); + } + } + } } + + // Apply under Lock so ClearChangeMasks propagates to monitored items + if (updates.Count > 0 || pendingAlarmEvents.Count > 0) + { + lock (Lock) + { + foreach (var (variable, dataValue) in updates) + { + variable.Value = dataValue.Value; + variable.StatusCode = dataValue.StatusCode; + variable.Timestamp = dataValue.SourceTimestamp; + variable.ClearChangeMasks(SystemContext, false); + } + + // Report alarm events + foreach (var (info, active) in pendingAlarmEvents) + { + try + { + ReportAlarmEvent(info, active); + } + catch (Exception ex) + { + Log.Warning(ex, "Error reporting alarm event for {Source}", info.SourceName); + } + } + } + } + + Interlocked.Add(ref _totalDispatchBatchSize, updates.Count); + Interlocked.Increment(ref _dispatchCycleCount); + ReportDispatchMetricsIfDue(); } + + Log.Information("Data change dispatch thread stopped"); + } + + private void ReportDispatchMetricsIfDue() + { + var now = DateTime.UtcNow; + var elapsed = (now - _lastMetricsReportTime).TotalSeconds; + if (elapsed < 60) return; + + var totalEvents = Interlocked.Read(ref _totalMxChangeEvents); + var lastReported = Interlocked.Read(ref _lastReportedMxChangeEvents); + var eventsPerSecond = (totalEvents - lastReported) / elapsed; + Interlocked.Exchange(ref _lastReportedMxChangeEvents, totalEvents); + + var batchSize = Interlocked.Read(ref _totalDispatchBatchSize); + var cycles = Interlocked.Read(ref _dispatchCycleCount); + var avgQueueSize = cycles > 0 ? (double)batchSize / cycles : 0; + + // Reset rolling counters + Interlocked.Exchange(ref _totalDispatchBatchSize, 0); + Interlocked.Exchange(ref _dispatchCycleCount, 0); + + _lastMetricsReportTime = now; + _lastEventsPerSecond = eventsPerSecond; + _lastAvgBatchSize = avgQueueSize; + + Log.Information( + "DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}", + eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents); + } + + /// + protected override void Dispose(bool disposing) + { + if (disposing) + { + StopDispatchThread(); + _dataChangeSignal.Dispose(); + } + + base.Dispose(disposing); } #endregion diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxOpcUaServer.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxOpcUaServer.cs index bd39671..3092333 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxOpcUaServer.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxOpcUaServer.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using Opc.Ua; using Opc.Ua.Server; using ZB.MOM.WW.LmxOpcUa.Host.Domain; +using ZB.MOM.WW.LmxOpcUa.Host.Historian; using ZB.MOM.WW.LmxOpcUa.Host.Metrics; namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa @@ -14,6 +15,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private readonly string _galaxyName; private readonly IMxAccessClient _mxAccessClient; private readonly PerformanceMetrics _metrics; + private readonly HistorianDataSource? _historianDataSource; private LmxNodeManager? _nodeManager; /// @@ -39,18 +41,20 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa /// The Galaxy name used to construct the namespace URI and product URI. /// The runtime client used by the node manager for live data access. /// The metrics collector shared with the node manager. - public LmxOpcUaServer(string galaxyName, IMxAccessClient mxAccessClient, PerformanceMetrics metrics) + public LmxOpcUaServer(string galaxyName, IMxAccessClient mxAccessClient, PerformanceMetrics metrics, + HistorianDataSource? historianDataSource = null) { _galaxyName = galaxyName; _mxAccessClient = mxAccessClient; _metrics = metrics; + _historianDataSource = historianDataSource; } /// protected override MasterNodeManager CreateMasterNodeManager(IServerInternal server, ApplicationConfiguration configuration) { var namespaceUri = $"urn:{_galaxyName}:LmxOpcUa"; - _nodeManager = new LmxNodeManager(server, configuration, namespaceUri, _mxAccessClient, _metrics); + _nodeManager = new LmxNodeManager(server, configuration, namespaceUri, _mxAccessClient, _metrics, _historianDataSource); var nodeManagers = new List { _nodeManager }; return new MasterNodeManager(server, configuration, null, nodeManagers.ToArray()); diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/OpcUaServerHost.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/OpcUaServerHost.cs index 2ff04ec..d46ecb8 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/OpcUaServerHost.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/OpcUaServerHost.cs @@ -6,6 +6,7 @@ using Opc.Ua.Server; using Serilog; using ZB.MOM.WW.LmxOpcUa.Host.Configuration; using ZB.MOM.WW.LmxOpcUa.Host.Domain; +using ZB.MOM.WW.LmxOpcUa.Host.Historian; using ZB.MOM.WW.LmxOpcUa.Host.Metrics; namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa @@ -20,6 +21,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private readonly OpcUaConfiguration _config; private readonly IMxAccessClient _mxAccessClient; private readonly PerformanceMetrics _metrics; + private readonly HistorianDataSource? _historianDataSource; private ApplicationInstance? _application; private LmxOpcUaServer? _server; @@ -44,11 +46,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa /// The endpoint and session settings for the OPC UA host. /// The runtime client used by the node manager for live reads, writes, and subscriptions. /// The metrics collector shared with the node manager and runtime bridge. - public OpcUaServerHost(OpcUaConfiguration config, IMxAccessClient mxAccessClient, PerformanceMetrics metrics) + public OpcUaServerHost(OpcUaConfiguration config, IMxAccessClient mxAccessClient, PerformanceMetrics metrics, + HistorianDataSource? historianDataSource = null) { _config = config; _mxAccessClient = mxAccessClient; _metrics = metrics; + _historianDataSource = historianDataSource; } /// @@ -155,7 +159,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa certOk = await _application.CheckApplicationInstanceCertificate(false, 2048); } - _server = new LmxOpcUaServer(_config.GalaxyName, _mxAccessClient, _metrics); + _server = new LmxOpcUaServer(_config.GalaxyName, _mxAccessClient, _metrics, _historianDataSource); await _application.Start(_server); Log.Information("OPC UA server started on opc.tcp://localhost:{Port}{EndpointPath} (namespace={Namespace})", diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUaService.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUaService.cs index eb01f9a..0260133 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUaService.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUaService.cs @@ -54,6 +54,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host configuration.GetSection("MxAccess").Bind(_config.MxAccess); configuration.GetSection("GalaxyRepository").Bind(_config.GalaxyRepository); configuration.GetSection("Dashboard").Bind(_config.Dashboard); + configuration.GetSection("Historian").Bind(_config.Historian); _mxProxy = new MxProxyAdapter(); _galaxyRepository = new GalaxyRepositoryService(_config.GalaxyRepository); @@ -152,7 +153,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host // Step 8: Create OPC UA server host + node manager var effectiveMxClient = (IMxAccessClient?)_mxAccessClient ?? _mxAccessClientForWiring ?? new NullMxAccessClient(); - _serverHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, _metrics); + var historianDataSource = new Historian.HistorianDataSource(_config.Historian); + _serverHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, _metrics, historianDataSource); // Step 9-10: Query hierarchy, start server, build address space DateTime? initialDeployTime = null; @@ -202,7 +204,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host // Step 13: Dashboard _healthCheck = new HealthCheckService(); _statusReport = new StatusReportService(_healthCheck, _config.Dashboard.RefreshIntervalSeconds); - _statusReport.SetComponents(effectiveMxClient, _metrics, _galaxyStats, _serverHost); + _statusReport.SetComponents(effectiveMxClient, _metrics, _galaxyStats, _serverHost, _nodeManager); if (_config.Dashboard.Enabled) { diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusData.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusData.cs index 87948a8..0e8db72 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusData.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusData.cs @@ -29,6 +29,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status /// public GalaxyInfo Galaxy { get; set; } = new(); + /// + /// Gets or sets MXAccess data change dispatch queue metrics. + /// + public DataChangeInfo DataChange { get; set; } = new(); + /// /// Gets or sets per-operation performance statistics used to diagnose bridge throughput and latency. /// @@ -129,6 +134,32 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status public DateTime? LastRebuildTime { get; set; } } + /// + /// Dashboard model for MXAccess data change dispatch metrics. + /// + public class DataChangeInfo + { + /// + /// Gets or sets the rate of MXAccess data change events received per second. + /// + public double EventsPerSecond { get; set; } + + /// + /// Gets or sets the average number of items processed per dispatch cycle. + /// + public double AvgBatchSize { get; set; } + + /// + /// Gets or sets the number of items currently waiting in the dispatch queue. + /// + public int PendingItems { get; set; } + + /// + /// Gets or sets the total MXAccess data change events received since startup. + /// + public long TotalEvents { get; set; } + } + /// /// Dashboard model for the status page footer. /// diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusReportService.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusReportService.cs index dd9a8a0..c52941a 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusReportService.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/Status/StatusReportService.cs @@ -20,6 +20,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status private PerformanceMetrics? _metrics; private GalaxyRepositoryStats? _galaxyStats; private OpcUaServerHost? _serverHost; + private LmxNodeManager? _nodeManager; /// /// Initializes a new status report service for the dashboard using the supplied health-check policy and refresh interval. @@ -40,12 +41,14 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status /// The Galaxy repository statistics to surface on the dashboard. /// The OPC UA server host whose active session count should be reported. public void SetComponents(IMxAccessClient? mxAccessClient, PerformanceMetrics? metrics, - GalaxyRepositoryStats? galaxyStats, OpcUaServerHost? serverHost) + GalaxyRepositoryStats? galaxyStats, OpcUaServerHost? serverHost, + LmxNodeManager? nodeManager = null) { _mxAccessClient = mxAccessClient; _metrics = metrics; _galaxyStats = galaxyStats; _serverHost = serverHost; + _nodeManager = nodeManager; } /// @@ -78,6 +81,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status AttributeCount = _galaxyStats?.AttributeCount ?? 0, LastRebuildTime = _galaxyStats?.LastRebuildTime }, + DataChange = new DataChangeInfo + { + EventsPerSecond = _nodeManager?.MxChangeEventsPerSecond ?? 0, + AvgBatchSize = _nodeManager?.AverageDispatchBatchSize ?? 0, + PendingItems = _nodeManager?.PendingDataChangeCount ?? 0, + TotalEvents = _nodeManager?.TotalMxChangeEvents ?? 0 + }, Operations = _metrics?.GetStatistics() ?? new(), Footer = new FooterInfo { @@ -97,6 +107,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status var sb = new StringBuilder(); sb.AppendLine(""); + sb.AppendLine(""); sb.AppendLine($""); sb.AppendLine("LmxOpcUa Status"); sb.AppendLine("