246 lines
18 KiB
Markdown
246 lines
18 KiB
Markdown
# Component: Data Connection Layer
|
||
|
||
## Purpose
|
||
|
||
The Data Connection Layer provides a uniform interface for reading from and writing to physical machines at site clusters. It abstracts protocol-specific details behind a common interface, manages subscriptions, and delivers live tag value updates to Instance Actors. It is a **clean data pipe** — it performs no evaluation of triggers, alarm conditions, or business logic.
|
||
|
||
## Location
|
||
|
||
Site clusters only. Central does not interact with machines directly.
|
||
|
||
## Responsibilities
|
||
|
||
- Manage data connections defined centrally and deployed to sites as part of artifact deployment (OPC UA servers, LmxProxy endpoints). Data connection definitions are stored in local SQLite after deployment.
|
||
- Establish and maintain connections to data sources based on deployed instance configurations.
|
||
- Subscribe to tag paths as requested by Instance Actors (based on attribute data source references in the flattened configuration).
|
||
- Deliver tag value updates to the requesting Instance Actors.
|
||
- Support writing values to machines (when Instance Actors forward `SetAttribute` write requests for data-connected attributes).
|
||
- Report data connection health status to the Health Monitoring component.
|
||
|
||
## Common Interface
|
||
|
||
Both OPC UA and LmxProxy implement the same interface:
|
||
|
||
```
|
||
IDataConnection : IAsyncDisposable
|
||
├── Connect(connectionDetails) → void
|
||
├── Disconnect() → void
|
||
├── Subscribe(tagPath, callback) → subscriptionId
|
||
├── Unsubscribe(subscriptionId) → void
|
||
├── Read(tagPath) → value
|
||
├── ReadBatch(tagPaths) → values
|
||
├── Write(tagPath, value) → void
|
||
├── WriteBatch(values) → void
|
||
├── WriteBatchAndWait(values, flagPath, flagValue, responsePath, responseValue, timeout) → bool
|
||
├── Status → ConnectionHealth
|
||
└── Disconnected → event Action?
|
||
```
|
||
|
||
The `Disconnected` event is raised by an adapter when it detects an unexpected connection loss (server offline, network failure, keep-alive timeout). The `DataConnectionActor` subscribes to this event to trigger the reconnection state machine. Additional protocols can be added by implementing this interface.
|
||
|
||
### Concrete Type Mappings
|
||
|
||
| IDataConnection | OPC UA SDK | LmxProxy (`RealLmxProxyClient`) |
|
||
|---|---|---|
|
||
| `Connect()` | OPC UA session establishment | gRPC `Connect` RPC with `x-api-key` metadata header, server returns `SessionId` |
|
||
| `Disconnect()` | Close OPC UA session | gRPC `Disconnect` RPC |
|
||
| `Subscribe(tagPath, callback)` | OPC UA Monitored Items | gRPC `Subscribe` server-streaming RPC (`stream VtqMessage`), cancelled via `CancellationTokenSource` |
|
||
| `Unsubscribe(id)` | Remove Monitored Item | Cancel the `CancellationTokenSource` for that subscription (stops streaming RPC) |
|
||
| `Read(tagPath)` | OPC UA Read | gRPC `Read` RPC → `VtqMessage` → `LmxVtq` |
|
||
| `ReadBatch(tagPaths)` | OPC UA Read (multiple nodes) | gRPC `ReadBatch` RPC → `repeated VtqMessage` → `IDictionary<string, LmxVtq>` |
|
||
| `Write(tagPath, value)` | OPC UA Write | gRPC `Write` RPC (throws on failure) |
|
||
| `WriteBatch(values)` | OPC UA Write (multiple nodes) | gRPC `WriteBatch` RPC (throws on failure) |
|
||
| `WriteBatchAndWait(...)` | OPC UA Write + poll for confirmation | `WriteBatch` + poll `Read` at 100ms intervals until response value matches or timeout |
|
||
| `Status` | OPC UA session state | `IsConnected` — true when `SessionId` is non-empty |
|
||
| `Disconnected` | `Session.KeepAlive` event fires with bad `ServiceResult` | gRPC subscription stream ends or throws non-cancellation `RpcException` |
|
||
|
||
### Common Value Type
|
||
|
||
Both protocols produce the same value tuple consumed by Instance Actors. Before the first value update arrives from the DCL, data-sourced attributes are held at **uncertain** quality by the Instance Actor (see Site Runtime — Initialization):
|
||
|
||
| Concept | ScadaLink Design | LmxProxy Wire Format | Local Type |
|
||
|---|---|---|---|
|
||
| Value container | `TagValue(Value, Quality, Timestamp)` | `VtqMessage { Tag, Value, TimestampUtcTicks, Quality }` | `LmxVtq(Value, TimestampUtc, Quality)` — readonly record struct |
|
||
| Quality | `QualityCode` enum: Good / Bad / Uncertain | String: `"Good"` / `"Uncertain"` / `"Bad"` | `LmxQuality` enum: Good / Uncertain / Bad |
|
||
| Timestamp | `DateTimeOffset` (UTC) | `int64` (DateTime.Ticks, UTC) | `DateTime` (UTC) |
|
||
| Value type | `object?` | `string` (parsed by client to double, bool, or string) | `object?` |
|
||
|
||
## Supported Protocols
|
||
|
||
### OPC UA
|
||
- Uses the **OPC Foundation .NET Standard Library** (`OPCFoundation.NetStandard.Opc.Ua.Client`).
|
||
- Session-based connection with endpoint discovery, certificate handling, and configurable security modes.
|
||
- Subscriptions via OPC UA Monitored Items with data change notifications (1000ms sampling, queue size 10, discard-oldest).
|
||
- Read/Write via OPC UA Read/Write services with StatusCode-based quality mapping.
|
||
- Disconnect detection via `Session.KeepAlive` event (see Disconnect Detection Pattern below).
|
||
|
||
### LmxProxy (Custom Protocol)
|
||
|
||
LmxProxy is a gRPC-based protocol for communicating with LMX data servers. The DCL includes its own proto-generated gRPC client (`RealLmxProxyClient`) — no external SDK dependency.
|
||
|
||
**Transport & Connection**:
|
||
- gRPC over HTTP/2, using proto-generated client stubs from `scada.proto` (service: `scada.ScadaService`). Pre-generated C# files are checked into `Adapters/LmxProxyGrpc/` to avoid running `protoc` in Docker (ARM64 compatibility).
|
||
- Default port: **50051**.
|
||
- Session-based: `Connect` RPC returns a `SessionId` used for all subsequent operations.
|
||
- Keep-alive: Managed by the LmxProxy server's session timeout. The DCL reconnect cycle handles session loss.
|
||
|
||
**Authentication & TLS**:
|
||
- API key-based authentication sent as `x-api-key` gRPC metadata header on every call. The server's `ApiKeyInterceptor` validates the header before the request reaches the service method. The API key is also included in the `ConnectRequest` body for session-level validation.
|
||
- Plain HTTP/2 (no TLS) for current deployments. The server supports TLS when configured.
|
||
|
||
**Subscriptions**:
|
||
- Server-streaming gRPC (`Subscribe` RPC returns `stream VtqMessage`).
|
||
- Configurable sampling interval (default: 0 = on-change).
|
||
- Wire format: `VtqMessage { tag, value (string), timestamp_utc_ticks (int64), quality (string: "Good"/"Uncertain"/"Bad") }`.
|
||
- Subscription lifetime managed by `CancellationTokenSource` — cancellation stops the streaming RPC.
|
||
|
||
**Client Implementation** (`RealLmxProxyClient`):
|
||
- Uses `Google.Protobuf` + `Grpc.Net.Client` (standard proto-generated stubs, no protobuf-net runtime IL emit).
|
||
- `ILmxProxyClientFactory` creates instances configured with host, port, and API key.
|
||
- Value conversion: string values from `VtqMessage` are parsed to `double`, `bool`, or left as `string`.
|
||
- Quality mapping: `"Good"` → `LmxQuality.Good`, `"Uncertain"` → `LmxQuality.Uncertain`, else `LmxQuality.Bad`.
|
||
|
||
**Proto Source**: The `.proto` file originates from the LmxProxy server repository (`lmx/Proxy/Grpc/Protos/scada.proto` in ScadaBridge). The C# stubs are pre-generated and stored at `Adapters/LmxProxyGrpc/`.
|
||
|
||
**Test Infrastructure**: The `infra/lmxfakeproxy/` project provides a fake LmxProxy server that bridges to the OPC UA test server. It implements the full `scada.ScadaService` proto, enabling end-to-end testing of `RealLmxProxyClient` without a Windows LmxProxy deployment. See [test_infra_lmxfakeproxy.md](test_infra_lmxfakeproxy.md) for setup.
|
||
|
||
## Connection Configuration Reference
|
||
|
||
All settings are parsed from the data connection's `Configuration` JSON dictionary (stored as `IDictionary<string, string>` connection details). Invalid numeric values fall back to defaults silently.
|
||
|
||
### OPC UA Settings
|
||
|
||
| Key | Type | Default | Description |
|
||
|-----|------|---------|-------------|
|
||
| `endpoint` / `EndpointUrl` | string | `opc.tcp://localhost:4840` | OPC UA server endpoint URL |
|
||
| `SessionTimeoutMs` | int | `60000` | OPC UA session timeout in milliseconds |
|
||
| `OperationTimeoutMs` | int | `15000` | Transport operation timeout in milliseconds |
|
||
| `PublishingIntervalMs` | int | `1000` | Subscription publishing interval in milliseconds |
|
||
| `KeepAliveCount` | int | `10` | Keep-alive frames before session timeout |
|
||
| `LifetimeCount` | int | `30` | Subscription lifetime in publish intervals |
|
||
| `MaxNotificationsPerPublish` | int | `100` | Max notifications batched per publish cycle |
|
||
| `SamplingIntervalMs` | int | `1000` | Per-item server sampling rate in milliseconds |
|
||
| `QueueSize` | int | `10` | Per-item notification buffer size |
|
||
| `SecurityMode` | string | `None` | Preferred endpoint security: `None`, `Sign`, or `SignAndEncrypt` |
|
||
| `AutoAcceptUntrustedCerts` | bool | `true` | Accept untrusted server certificates |
|
||
|
||
### LmxProxy Settings
|
||
|
||
| Key | Type | Default | Description |
|
||
|-----|------|---------|-------------|
|
||
| `Host` | string | `localhost` | LmxProxy server hostname |
|
||
| `Port` | int | `50051` | LmxProxy gRPC port |
|
||
| `ApiKey` | string | *(none)* | API key for `x-api-key` header authentication |
|
||
| `SamplingIntervalMs` | int | `0` | Subscription sampling interval: 0 = on-change, >0 = time-based (ms) |
|
||
| `UseTls` | bool | `false` | Use HTTPS instead of plain HTTP/2 for gRPC channel |
|
||
|
||
### Shared Settings (appsettings.json)
|
||
|
||
These are configured via `DataConnectionOptions` in `appsettings.json`, not per-connection:
|
||
|
||
| Setting | Default | Description |
|
||
|---------|---------|-------------|
|
||
| `ReconnectInterval` | 5s | Fixed interval between reconnection attempts |
|
||
| `TagResolutionRetryInterval` | 10s | Retry interval for unresolved tag paths |
|
||
| `WriteTimeout` | 30s | Timeout for write operations |
|
||
| `LmxProxyKeepAliveInterval` | 30s | Keep-alive ping interval for LmxProxy sessions |
|
||
|
||
## Subscription Management
|
||
|
||
- When an Instance Actor is created (as part of the Site Runtime actor hierarchy), it registers its data source references with the Data Connection Layer.
|
||
- The DCL subscribes to the tag paths using the concrete connection details from the flattened configuration.
|
||
- Tag value updates are delivered directly to the requesting Instance Actor.
|
||
- When an Instance Actor is stopped (due to disable, delete, or redeployment), the DCL cleans up the associated subscriptions.
|
||
- When a new Instance Actor is created for a redeployment, subscriptions are established fresh based on the new configuration.
|
||
|
||
## Write-Back Support
|
||
|
||
- When a script calls `Instance.SetAttribute` for an attribute with a data source reference, the Instance Actor sends a write request to the DCL.
|
||
- The DCL writes the value to the physical device via the appropriate protocol.
|
||
- The existing subscription picks up the confirmed new value from the device and delivers it back to the Instance Actor as a standard value update.
|
||
- The Instance Actor's in-memory value is **not** updated until the device confirms the write.
|
||
|
||
## Value Update Message Format
|
||
|
||
Each value update delivered to an Instance Actor includes:
|
||
- **Tag path**: The relative path of the attribute's data source reference.
|
||
- **Value**: The new value from the device.
|
||
- **Quality**: Data quality indicator (good, bad, uncertain).
|
||
- **Timestamp**: When the value was read from the device.
|
||
|
||
## Connection Actor Model
|
||
|
||
Each data connection is managed by a dedicated connection actor that uses the Akka.NET **Become/Stash** pattern to model its lifecycle as a state machine:
|
||
|
||
- **Connecting**: The actor attempts to establish the connection. Subscription requests and write commands received during this phase are **stashed** (buffered in the actor's stash).
|
||
- **Connected**: The actor is actively servicing subscriptions. On entering this state, all stashed messages are unstashed and processed.
|
||
- **Reconnecting**: The connection was lost. The actor transitions back to a connecting-like state, stashing new requests while it retries.
|
||
|
||
This pattern ensures no messages are lost during connection transitions and is the standard Akka.NET approach for actors with I/O lifecycle dependencies.
|
||
|
||
**LmxProxy-specific notes**: The `RealLmxProxyClient` holds the `SessionId` returned by the `Connect` RPC and includes it in all subsequent operations. Subscriptions use server-streaming gRPC — a background task reads from the `ResponseStream` and invokes the callback for each `VtqMessage`. When the stream breaks (server offline, network failure), the background task detects the `RpcException` or stream end and invokes the `onStreamError` callback, which triggers the adapter's `Disconnected` event. The DCL actor transitions to **Reconnecting**, pushes bad quality, disposes the client, and retries at the fixed interval.
|
||
|
||
**OPC UA-specific notes**: The `RealOpcUaClient` uses the OPC Foundation SDK's `Session.KeepAlive` event for proactive disconnect detection. The SDK sends keep-alive requests at the subscription's `KeepAliveCount × PublishingInterval` (default: 10s). When keep-alive fails, the `ConnectionLost` event fires, triggering the same reconnection flow. On reconnection, the DCL re-creates the OPC UA session and subscription, then re-adds all monitored items.
|
||
|
||
## Connection Lifecycle & Reconnection
|
||
|
||
The DCL manages connection lifecycle automatically:
|
||
|
||
1. **Connection drop detection**: When a connection to a data source is lost, the DCL immediately pushes a value update with quality `bad` for **every tag subscribed on that connection**. Instance Actors and their downstream consumers (alarms, scripts checking quality) see the staleness immediately.
|
||
2. **Auto-reconnect with fixed interval**: The DCL retries the connection at a configurable fixed interval (e.g., every 5 seconds). The retry interval is defined **per data connection**. This is consistent with the fixed-interval retry philosophy used throughout the system. Individual gRPC/OPC UA operations (reads, writes) fail immediately to the caller on error; there is no operation-level retry within the adapter.
|
||
3. **Connection state transitions**: The DCL tracks each connection's state as `connected`, `disconnected`, or `reconnecting`. All transitions are logged to Site Event Logging.
|
||
4. **Transparent re-subscribe**: On successful reconnection, the DCL automatically re-establishes all previously active subscriptions for that connection. Instance Actors require no action — they simply see quality return to `good` as fresh values arrive from restored subscriptions.
|
||
|
||
### Disconnect Detection Pattern
|
||
|
||
Each adapter implements the `IDataConnection.Disconnected` event to proactively signal connection loss to the `DataConnectionActor`. Detection uses two complementary paths:
|
||
|
||
**Proactive detection** (server goes offline between operations):
|
||
- **OPC UA**: The OPC Foundation SDK fires `Session.KeepAlive` events at regular intervals. `RealOpcUaClient` hooks this event; when `ServiceResult.IsBad(e.Status)` (server unreachable, keep-alive timeout), it fires `ConnectionLost`. The `OpcUaDataConnection` adapter translates this into `IDataConnection.Disconnected`.
|
||
- **LmxProxy**: gRPC server-streaming subscriptions run in background tasks reading from `ResponseStream`. When the server goes offline, the stream either ends normally (server closed) or throws a non-cancellation `RpcException`. `RealLmxProxyClient` invokes the `onStreamError` callback, which `LmxProxyDataConnection` translates into `IDataConnection.Disconnected`.
|
||
|
||
**Reactive detection** (failure discovered during an operation):
|
||
- Both adapters wrap `ReadAsync` (and by extension `ReadBatchAsync`) with exception handling. If a read throws a non-cancellation exception, the adapter calls `RaiseDisconnected()` and re-throws. The `DataConnectionActor`'s existing error handling catches the exception while the disconnect event triggers the reconnection state machine.
|
||
|
||
**Event marshalling**: The `DataConnectionActor` subscribes to `_adapter.Disconnected` in `PreStart()`. Since `Disconnected` may fire from a background thread (gRPC stream task, OPC UA keep-alive timer), the handler sends an `AdapterDisconnected` message to `Self`, marshalling the notification onto the actor's message loop. This triggers `BecomeReconnecting()` → bad quality push → retry timer.
|
||
|
||
**Once-only guard**: Both `LmxProxyDataConnection` and `OpcUaDataConnection` use a `volatile bool _disconnectFired` flag to ensure `RaiseDisconnected()` fires exactly once per connection session. The flag resets on successful reconnection (`ConnectAsync`).
|
||
|
||
## Write Failure Handling
|
||
|
||
Writes to physical devices are **synchronous** from the script's perspective:
|
||
|
||
- If the write fails (connection down, device rejection, timeout), the error is **returned to the calling script**. Script authors can catch and handle write errors (log, notify, retry, etc.).
|
||
- Write failures are also logged to Site Event Logging.
|
||
- There is **no store-and-forward for device writes** — these are real-time control operations. Buffering stale setpoints for later application would be dangerous in an industrial context.
|
||
|
||
## Tag Path Resolution
|
||
|
||
When the DCL subscribes to a tag path from the flattened configuration but the path does not exist on the physical device (e.g., typo in the template, device firmware changed, device still booting):
|
||
|
||
1. The failure is **logged to Site Event Logging**.
|
||
2. The attribute is marked with quality `bad`.
|
||
3. The DCL **periodically retries resolution** at a configurable interval, accommodating devices that come online in stages or load modules after startup.
|
||
4. On successful resolution, the subscription activates normally and quality reflects the live value from the device.
|
||
|
||
Note: Pre-deployment validation at central does **not** verify that tag paths resolve to real tags on physical devices — that is a runtime concern handled here.
|
||
|
||
## Health Reporting
|
||
|
||
The DCL reports the following metrics to the Health Monitoring component via the existing periodic heartbeat:
|
||
|
||
- **Connection status**: `connected`, `disconnected`, or `reconnecting` per data connection.
|
||
- **Tag resolution counts**: Per connection, the number of total subscribed tags vs. successfully resolved tags. This gives operators visibility into misconfigured templates without needing to open the debug view for individual instances.
|
||
|
||
## Dependencies
|
||
|
||
- **Site Runtime (Instance Actors)**: Receives subscription registrations and delivers value updates. Receives write requests.
|
||
- **Health Monitoring**: Reports connection status.
|
||
- **Site Event Logging**: Logs connection status changes.
|
||
|
||
## Interactions
|
||
|
||
- **Site Runtime (Instance Actors)**: Bidirectional — delivers value updates, receives subscription registrations and write-back commands.
|
||
- **Health Monitoring**: Reports connection health periodically.
|
||
- **Site Event Logging**: Logs connection/disconnection events.
|