Doc refresh (task #202) — core architecture docs for multi-driver OtOpcUa
Rewrite seven core-architecture docs to match the shipped multi-driver platform. The v1 single-driver LmxNodeManager framing is replaced with the Core + capability-interface model — Galaxy is now one driver of seven, and each doc points at the current class names + source paths. What changed per file: - OpcUaServer.md — OtOpcUaServer as StandardServer host; per-driver DriverNodeManager + CapabilityInvoker wiring; Config-DB-driven configuration (sp_PublishGeneration, DraftRevisionToken, Admin UI); Phase 6.2 AuthorizationGate integration. - AddressSpace.md — GenericDriverNodeManager.BuildAddressSpaceAsync walks ITagDiscovery.DiscoverAsync and streams DriverAttributeInfo through IAddressSpaceBuilder; CapturingBuilder registers alarm-condition sinks; per-driver NodeId schemes replace the fixed ns=1;s=ZB root. - ReadWriteOperations.md — OnReadValue / OnWriteValue dispatch to IReadable.ReadAsync / IWritable.WriteAsync through CapabilityInvoker, honoring WriteIdempotentAttribute (#143); two-layer authorization (WriteAuthzPolicy + Phase 6.2 AuthorizationGate). - Subscriptions.md — ISubscribable.SubscribeAsync/UnsubscribeAsync is the capability surface; STA-thread story is now Galaxy-specific (StaPump inside Driver.Galaxy.Host), other drivers are free-threaded. - AlarmTracking.md — IAlarmSource is optional; AlarmSurfaceInvoker wraps Subscribe/Ack/Unsubscribe with fan-out by IPerCallHostResolver and the no-retry AlarmAcknowledge pipeline (#143); CapturingBuilder registers sinks at build time. - DataTypeMapping.md — DriverDataType + SecurityClassification are the driver-agnostic enums; per-driver mappers (GalaxyProxyDriver inline, AbCipDataType, ModbusDriver, etc.); SecurityClassification is metadata only, ACL enforcement is at the server layer. - IncrementalSync.md — IRediscoverable covers backend-change signals; sp_ComputeGenerationDiff + DiffViewer drive generation-level change detection; IDriver.ReinitializeAsync is the in-process recovery path.
This commit is contained in:
@@ -1,135 +1,60 @@
|
||||
# Subscriptions
|
||||
|
||||
`LmxNodeManager` bridges OPC UA monitored items to MXAccess runtime subscriptions using reference counting and a decoupled dispatch architecture. This design ensures that MXAccess COM callbacks (which run on the STA thread) never contend with the OPC UA framework lock.
|
||||
Driver-side data-change subscriptions live behind `ISubscribable` (`src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs`). The interface is deliberately mechanism-agnostic: it covers native subscriptions (Galaxy MXAccess advisory, OPC UA monitored items on an upstream server, TwinCAT ADS notifications) and driver-internal polled subscriptions (Modbus, AB CIP, S7, FOCAS). Core sees the same event shape regardless — drivers fire `OnDataChange` and Core dispatches to the matching OPC UA monitored items.
|
||||
|
||||
## Ref-Counted MXAccess Subscriptions
|
||||
|
||||
Multiple OPC UA clients can subscribe to the same Galaxy tag simultaneously. Rather than opening duplicate MXAccess subscriptions, `LmxNodeManager` maintains a reference count per tag in `_subscriptionRefCounts`.
|
||||
|
||||
### SubscribeTag
|
||||
|
||||
`SubscribeTag` increments the reference count for a tag reference. On the first subscription (count goes from 0 to 1), it calls `_mxAccessClient.SubscribeAsync` to open the MXAccess runtime subscription:
|
||||
## ISubscribable surface
|
||||
|
||||
```csharp
|
||||
internal void SubscribeTag(string fullTagReference)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count))
|
||||
_subscriptionRefCounts[fullTagReference] = count + 1;
|
||||
else
|
||||
{
|
||||
_subscriptionRefCounts[fullTagReference] = 1;
|
||||
_ = _mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { });
|
||||
}
|
||||
}
|
||||
}
|
||||
Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences,
|
||||
TimeSpan publishingInterval,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken);
|
||||
|
||||
event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
```
|
||||
|
||||
### UnsubscribeTag
|
||||
A single `SubscribeAsync` call may batch many attributes and returns an opaque handle the caller passes back to `UnsubscribeAsync`. The driver may emit an immediate `OnDataChange` for each subscribed reference (the OPC UA initial-data convention) and then a push per change.
|
||||
|
||||
`UnsubscribeTag` decrements the reference count. When the count reaches zero, the MXAccess subscription is closed via `UnsubscribeAsync` and the tag is removed from the dictionary:
|
||||
Every subscribe / unsubscribe call goes through `CapabilityInvoker.ExecuteAsync(DriverCapability.Subscribe, host, …)` so the per-host pipeline applies.
|
||||
|
||||
```csharp
|
||||
if (count <= 1)
|
||||
{
|
||||
_subscriptionRefCounts.Remove(fullTagReference);
|
||||
_ = _mxAccessClient.UnsubscribeAsync(fullTagReference);
|
||||
}
|
||||
else
|
||||
_subscriptionRefCounts[fullTagReference] = count - 1;
|
||||
```
|
||||
## Reference counting at Core
|
||||
|
||||
Both methods use `lock (_lock)` (a private object, distinct from the OPC UA framework `Lock`) to serialize ref-count updates without blocking node value dispatches.
|
||||
Multiple OPC UA clients can monitor the same variable simultaneously. Rather than open duplicate driver subscriptions, Core maintains a ref-count per `(driver, fullReference)` pair: the first OPC UA monitored-item for a reference triggers `ISubscribable.SubscribeAsync` with that single reference; each additional monitored-item just increments the count; decrement-to-zero triggers `UnsubscribeAsync`. Transferred subscriptions (client reconnect → resume session) replay against the same ref-count map so active driver subscriptions are preserved across session migration.
|
||||
|
||||
## OnMonitoredItemCreated
|
||||
## Threading
|
||||
|
||||
The OPC UA framework calls `OnMonitoredItemCreated` when a client creates a monitored item. The override resolves the node handle to a tag reference and calls `SubscribeTag`, which opens the MXAccess subscription early so runtime values start arriving before the first publish cycle:
|
||||
The STA thread story is now driver-specific, not a server-wide concern:
|
||||
|
||||
```csharp
|
||||
protected override void OnMonitoredItemCreated(ServerSystemContext context,
|
||||
NodeHandle handle, MonitoredItem monitoredItem)
|
||||
{
|
||||
base.OnMonitoredItemCreated(context, handle, monitoredItem);
|
||||
var nodeIdStr = handle?.NodeId?.Identifier as string;
|
||||
if (nodeIdStr != null && _nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
|
||||
SubscribeTag(tagRef);
|
||||
}
|
||||
```
|
||||
- **Galaxy** runs its MXAccess COM objects on a dedicated STA thread with a Win32 message pump (`src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Sta/StaPump.cs`) inside the standalone `Driver.Galaxy.Host` Windows service. The Proxy driver (`Driver.Galaxy.Proxy`) connects to the Host via named pipe and re-exposes the data on a free-threaded surface to Core. Core never touches COM.
|
||||
- **Modbus / S7 / AB CIP / AB Legacy / TwinCAT / FOCAS** are free-threaded — they run their polling loops on ordinary `Task`s. Their `OnDataChange` fires on thread-pool threads.
|
||||
- **OPC UA Client** delegates to the OPC Foundation stack's subscription loop.
|
||||
|
||||
`OnDeleteMonitoredItemsComplete` performs the inverse, calling `UnsubscribeTag` for each deleted monitored item.
|
||||
The common contract: drivers are responsible for marshalling from whatever native thread the backend uses onto thread-pool threads before raising `OnDataChange`. Core's dispatch path acquires the OPC UA framework `Lock` and calls `ClearChangeMasks` on the corresponding `BaseDataVariableState` to notify subscribed clients.
|
||||
|
||||
## Data Change Dispatch Queue
|
||||
## Dispatch
|
||||
|
||||
MXAccess delivers data change callbacks on the STA thread via the `OnTagValueChanged` event. These callbacks must not acquire the OPC UA framework `Lock` directly because the lock is also held during `Read`/`Write` operations that call into MXAccess (creating a potential deadlock with the STA thread). The solution is a `ConcurrentDictionary<string, Vtq>` named `_pendingDataChanges` that decouples the two threads.
|
||||
Core's subscription dispatch path:
|
||||
|
||||
### Callback handler
|
||||
1. `ISubscribable.OnDataChange` fires on a thread-pool thread with a `DataChangeEventArgs(subscriptionHandle, fullReference, DataValueSnapshot)`.
|
||||
2. Core looks up the variable by `fullReference` in the driver's `DriverNodeManager` variable map.
|
||||
3. Under the OPC UA framework `Lock`, the variable's `Value` / `StatusCode` / `Timestamp` are updated and `ClearChangeMasks(SystemContext, false)` is called.
|
||||
4. The OPC Foundation stack then enqueues data-change notifications for every monitored-item attached to that variable, honoring each subscription's sampling + filter configuration.
|
||||
|
||||
`OnMxAccessDataChange` runs on the STA thread. It stores the latest value in the concurrent dictionary (coalescing rapid updates for the same tag) and signals the dispatch thread:
|
||||
Batch coalescing — coalescing multiple pushes for the same reference between publish cycles — is done driver-side when the backend natively supports it (Galaxy keeps the v1 coalescing dictionary); otherwise the SDK's own data-change filter suppresses no-change notifications.
|
||||
|
||||
```csharp
|
||||
private void OnMxAccessDataChange(string address, Vtq vtq)
|
||||
{
|
||||
Interlocked.Increment(ref _totalMxChangeEvents);
|
||||
_pendingDataChanges[address] = vtq;
|
||||
_dataChangeSignal.Set();
|
||||
}
|
||||
```
|
||||
## Initial values
|
||||
|
||||
### Dispatch thread architecture
|
||||
A freshly-built variable carries `StatusCode = BadWaitingForInitialData` until the driver delivers the first value. Drivers whose backends supply an initial read (Galaxy `AdviseSupervisory`, TwinCAT `AddDeviceNotification`) fire `OnDataChange` immediately after `SubscribeAsync` returns. Polled drivers fire the first push when their first poll cycle completes.
|
||||
|
||||
A dedicated background thread (`OpcUaDataChangeDispatch`) runs `DispatchLoop`, which waits on an `AutoResetEvent` with a 100ms timeout. The decoupled design exists for two reasons:
|
||||
## Transferred subscription restoration
|
||||
|
||||
1. **Deadlock avoidance** -- The STA thread must not acquire the OPC UA `Lock`. The dispatch thread is a normal background thread that can safely acquire `Lock`.
|
||||
2. **Batch coalescing** -- Multiple MXAccess callbacks for the same tag between dispatch cycles are collapsed to the latest value via dictionary key overwrite. Under high load, this reduces the number of `ClearChangeMasks` calls.
|
||||
When an OPC UA session is resumed (client reconnect with `TransferSubscriptions`), Core walks the transferred monitored-items and ensures every referenced `(driver, fullReference)` has a live driver subscription. References already active (in-process migration) skip re-subscribing; references that lost their driver-side handle during the session gap are re-subscribed via `SubscribeAsync`.
|
||||
|
||||
The dispatch loop processes changes in two phases:
|
||||
## Key source files
|
||||
|
||||
**Phase 1 (outside Lock):** Drain keys from `_pendingDataChanges`, convert each `Vtq` to a `DataValue` via `CreatePublishedDataValue`, and collect alarm transition events. MXAccess reads for alarm Priority and DescAttrName values also happen in this phase, since they call back into the STA thread.
|
||||
|
||||
**Phase 2 (inside Lock):** Apply all prepared updates to variable nodes and call `ClearChangeMasks` on each to trigger OPC UA data change notifications. Alarm events are reported in this same lock scope.
|
||||
|
||||
```csharp
|
||||
lock (Lock)
|
||||
{
|
||||
foreach (var (variable, dataValue) in updates)
|
||||
{
|
||||
variable.Value = dataValue.Value;
|
||||
variable.StatusCode = dataValue.StatusCode;
|
||||
variable.Timestamp = dataValue.SourceTimestamp;
|
||||
variable.ClearChangeMasks(SystemContext, false);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### ClearChangeMasks
|
||||
|
||||
`ClearChangeMasks(SystemContext, false)` is the mechanism that notifies the OPC UA framework a node's value has changed. The framework uses change masks internally to track which nodes have pending notifications for active monitored items. Calling this method causes the server to enqueue data change notifications for all monitoring clients of that node. The `false` parameter indicates that child nodes should not be recursively cleared.
|
||||
|
||||
## Transferred Subscription Restoration
|
||||
|
||||
When OPC UA sessions are transferred (e.g., client reconnects and resumes a previous session), the framework calls `OnMonitoredItemsTransferred`. The override collects the tag references for all transferred items and calls `RestoreTransferredSubscriptions`.
|
||||
|
||||
`RestoreTransferredSubscriptions` groups the tag references by count and, for each tag that does not already have an active ref-count entry, opens a new MXAccess subscription and sets the initial reference count:
|
||||
|
||||
```csharp
|
||||
internal void RestoreTransferredSubscriptions(IEnumerable<string> fullTagReferences)
|
||||
{
|
||||
var transferredCounts = fullTagReferences
|
||||
.GroupBy(tagRef => tagRef, StringComparer.OrdinalIgnoreCase)
|
||||
.ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var kvp in transferredCounts)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_subscriptionRefCounts.ContainsKey(kvp.Key))
|
||||
continue;
|
||||
_subscriptionRefCounts[kvp.Key] = kvp.Value;
|
||||
}
|
||||
_ = _mxAccessClient.SubscribeAsync(kvp.Key, (_, _) => { });
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Tags that already have in-memory bookkeeping are skipped to avoid double-counting when the transfer happens within the same server process (normal in-process session migration).
|
||||
- `src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs` — capability contract
|
||||
- `src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs` — pipeline wrapping
|
||||
- `src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Sta/StaPump.cs` — Galaxy STA thread + message pump
|
||||
- Per-driver subscribe implementations in each `Driver.*` project
|
||||
|
||||
Reference in New Issue
Block a user