Files
scadalink-design/docs/requirements/Component-DataConnectionLayer.md
Joseph Doherty d91aa83665 refactor(docs): move requirements and test infra docs into docs/ subdirectories
Organize documentation by moving requirements (HighLevelReqs, Component-*,
lmxproxy_protocol) to docs/requirements/ and test infrastructure docs to
docs/test_infra/. Updates all cross-references in README, CLAUDE.md,
infra/README, component docs, and 23 plan files.
2026-03-21 01:11:35 -04:00

246 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Component: Data Connection Layer
## Purpose
The Data Connection Layer provides a uniform interface for reading from and writing to physical machines at site clusters. It abstracts protocol-specific details behind a common interface, manages subscriptions, and delivers live tag value updates to Instance Actors. It is a **clean data pipe** — it performs no evaluation of triggers, alarm conditions, or business logic.
## Location
Site clusters only. Central does not interact with machines directly.
## Responsibilities
- Manage data connections defined centrally and deployed to sites as part of artifact deployment (OPC UA servers, LmxProxy endpoints). Data connection definitions are stored in local SQLite after deployment.
- Establish and maintain connections to data sources based on deployed instance configurations.
- Subscribe to tag paths as requested by Instance Actors (based on attribute data source references in the flattened configuration).
- Deliver tag value updates to the requesting Instance Actors.
- Support writing values to machines (when Instance Actors forward `SetAttribute` write requests for data-connected attributes).
- Report data connection health status to the Health Monitoring component.
## Common Interface
Both OPC UA and LmxProxy implement the same interface:
```
IDataConnection : IAsyncDisposable
├── Connect(connectionDetails) → void
├── Disconnect() → void
├── Subscribe(tagPath, callback) → subscriptionId
├── Unsubscribe(subscriptionId) → void
├── Read(tagPath) → value
├── ReadBatch(tagPaths) → values
├── Write(tagPath, value) → void
├── WriteBatch(values) → void
├── WriteBatchAndWait(values, flagPath, flagValue, responsePath, responseValue, timeout) → bool
├── Status → ConnectionHealth
└── Disconnected → event Action?
```
The `Disconnected` event is raised by an adapter when it detects an unexpected connection loss (server offline, network failure, keep-alive timeout). The `DataConnectionActor` subscribes to this event to trigger the reconnection state machine. Additional protocols can be added by implementing this interface.
### Concrete Type Mappings
| IDataConnection | OPC UA SDK | LmxProxy (`RealLmxProxyClient`) |
|---|---|---|
| `Connect()` | OPC UA session establishment | gRPC `Connect` RPC with `x-api-key` metadata header, server returns `SessionId` |
| `Disconnect()` | Close OPC UA session | gRPC `Disconnect` RPC |
| `Subscribe(tagPath, callback)` | OPC UA Monitored Items | gRPC `Subscribe` server-streaming RPC (`stream VtqMessage`), cancelled via `CancellationTokenSource` |
| `Unsubscribe(id)` | Remove Monitored Item | Cancel the `CancellationTokenSource` for that subscription (stops streaming RPC) |
| `Read(tagPath)` | OPC UA Read | gRPC `Read` RPC → `VtqMessage``LmxVtq` |
| `ReadBatch(tagPaths)` | OPC UA Read (multiple nodes) | gRPC `ReadBatch` RPC → `repeated VtqMessage``IDictionary<string, LmxVtq>` |
| `Write(tagPath, value)` | OPC UA Write | gRPC `Write` RPC (throws on failure) |
| `WriteBatch(values)` | OPC UA Write (multiple nodes) | gRPC `WriteBatch` RPC (throws on failure) |
| `WriteBatchAndWait(...)` | OPC UA Write + poll for confirmation | `WriteBatch` + poll `Read` at 100ms intervals until response value matches or timeout |
| `Status` | OPC UA session state | `IsConnected` — true when `SessionId` is non-empty |
| `Disconnected` | `Session.KeepAlive` event fires with bad `ServiceResult` | gRPC subscription stream ends or throws non-cancellation `RpcException` |
### Common Value Type
Both protocols produce the same value tuple consumed by Instance Actors. Before the first value update arrives from the DCL, data-sourced attributes are held at **uncertain** quality by the Instance Actor (see Site Runtime — Initialization):
| Concept | ScadaLink Design | LmxProxy Wire Format | Local Type |
|---|---|---|---|
| Value container | `TagValue(Value, Quality, Timestamp)` | `VtqMessage { Tag, Value, TimestampUtcTicks, Quality }` | `LmxVtq(Value, TimestampUtc, Quality)` — readonly record struct |
| Quality | `QualityCode` enum: Good / Bad / Uncertain | String: `"Good"` / `"Uncertain"` / `"Bad"` | `LmxQuality` enum: Good / Uncertain / Bad |
| Timestamp | `DateTimeOffset` (UTC) | `int64` (DateTime.Ticks, UTC) | `DateTime` (UTC) |
| Value type | `object?` | `string` (parsed by client to double, bool, or string) | `object?` |
## Supported Protocols
### OPC UA
- Uses the **OPC Foundation .NET Standard Library** (`OPCFoundation.NetStandard.Opc.Ua.Client`).
- Session-based connection with endpoint discovery, certificate handling, and configurable security modes.
- Subscriptions via OPC UA Monitored Items with data change notifications (1000ms sampling, queue size 10, discard-oldest).
- Read/Write via OPC UA Read/Write services with StatusCode-based quality mapping.
- Disconnect detection via `Session.KeepAlive` event (see Disconnect Detection Pattern below).
### LmxProxy (Custom Protocol)
LmxProxy is a gRPC-based protocol for communicating with LMX data servers. The DCL includes its own proto-generated gRPC client (`RealLmxProxyClient`) — no external SDK dependency.
**Transport & Connection**:
- gRPC over HTTP/2, using proto-generated client stubs from `scada.proto` (service: `scada.ScadaService`). Pre-generated C# files are checked into `Adapters/LmxProxyGrpc/` to avoid running `protoc` in Docker (ARM64 compatibility).
- Default port: **50051**.
- Session-based: `Connect` RPC returns a `SessionId` used for all subsequent operations.
- Keep-alive: Managed by the LmxProxy server's session timeout. The DCL reconnect cycle handles session loss.
**Authentication & TLS**:
- API key-based authentication sent as `x-api-key` gRPC metadata header on every call. The server's `ApiKeyInterceptor` validates the header before the request reaches the service method. The API key is also included in the `ConnectRequest` body for session-level validation.
- Plain HTTP/2 (no TLS) for current deployments. The server supports TLS when configured.
**Subscriptions**:
- Server-streaming gRPC (`Subscribe` RPC returns `stream VtqMessage`).
- Configurable sampling interval (default: 0 = on-change).
- Wire format: `VtqMessage { tag, value (string), timestamp_utc_ticks (int64), quality (string: "Good"/"Uncertain"/"Bad") }`.
- Subscription lifetime managed by `CancellationTokenSource` — cancellation stops the streaming RPC.
**Client Implementation** (`RealLmxProxyClient`):
- Uses `Google.Protobuf` + `Grpc.Net.Client` (standard proto-generated stubs, no protobuf-net runtime IL emit).
- `ILmxProxyClientFactory` creates instances configured with host, port, and API key.
- Value conversion: string values from `VtqMessage` are parsed to `double`, `bool`, or left as `string`.
- Quality mapping: `"Good"``LmxQuality.Good`, `"Uncertain"``LmxQuality.Uncertain`, else `LmxQuality.Bad`.
**Proto Source**: The `.proto` file originates from the LmxProxy server repository (`lmx/Proxy/Grpc/Protos/scada.proto` in ScadaBridge). The C# stubs are pre-generated and stored at `Adapters/LmxProxyGrpc/`.
**Test Infrastructure**: The `infra/lmxfakeproxy/` project provides a fake LmxProxy server that bridges to the OPC UA test server. It implements the full `scada.ScadaService` proto, enabling end-to-end testing of `RealLmxProxyClient` without a Windows LmxProxy deployment. See [test_infra_lmxfakeproxy.md](../test_infra/test_infra_lmxfakeproxy.md) for setup.
## Connection Configuration Reference
All settings are parsed from the data connection's `Configuration` JSON dictionary (stored as `IDictionary<string, string>` connection details). Invalid numeric values fall back to defaults silently.
### OPC UA Settings
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `endpoint` / `EndpointUrl` | string | `opc.tcp://localhost:4840` | OPC UA server endpoint URL |
| `SessionTimeoutMs` | int | `60000` | OPC UA session timeout in milliseconds |
| `OperationTimeoutMs` | int | `15000` | Transport operation timeout in milliseconds |
| `PublishingIntervalMs` | int | `1000` | Subscription publishing interval in milliseconds |
| `KeepAliveCount` | int | `10` | Keep-alive frames before session timeout |
| `LifetimeCount` | int | `30` | Subscription lifetime in publish intervals |
| `MaxNotificationsPerPublish` | int | `100` | Max notifications batched per publish cycle |
| `SamplingIntervalMs` | int | `1000` | Per-item server sampling rate in milliseconds |
| `QueueSize` | int | `10` | Per-item notification buffer size |
| `SecurityMode` | string | `None` | Preferred endpoint security: `None`, `Sign`, or `SignAndEncrypt` |
| `AutoAcceptUntrustedCerts` | bool | `true` | Accept untrusted server certificates |
### LmxProxy Settings
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `Host` | string | `localhost` | LmxProxy server hostname |
| `Port` | int | `50051` | LmxProxy gRPC port |
| `ApiKey` | string | *(none)* | API key for `x-api-key` header authentication |
| `SamplingIntervalMs` | int | `0` | Subscription sampling interval: 0 = on-change, >0 = time-based (ms) |
| `UseTls` | bool | `false` | Use HTTPS instead of plain HTTP/2 for gRPC channel |
### Shared Settings (appsettings.json)
These are configured via `DataConnectionOptions` in `appsettings.json`, not per-connection:
| Setting | Default | Description |
|---------|---------|-------------|
| `ReconnectInterval` | 5s | Fixed interval between reconnection attempts |
| `TagResolutionRetryInterval` | 10s | Retry interval for unresolved tag paths |
| `WriteTimeout` | 30s | Timeout for write operations |
| `LmxProxyKeepAliveInterval` | 30s | Keep-alive ping interval for LmxProxy sessions |
## Subscription Management
- When an Instance Actor is created (as part of the Site Runtime actor hierarchy), it registers its data source references with the Data Connection Layer.
- The DCL subscribes to the tag paths using the concrete connection details from the flattened configuration.
- Tag value updates are delivered directly to the requesting Instance Actor.
- When an Instance Actor is stopped (due to disable, delete, or redeployment), the DCL cleans up the associated subscriptions.
- When a new Instance Actor is created for a redeployment, subscriptions are established fresh based on the new configuration.
## Write-Back Support
- When a script calls `Instance.SetAttribute` for an attribute with a data source reference, the Instance Actor sends a write request to the DCL.
- The DCL writes the value to the physical device via the appropriate protocol.
- The existing subscription picks up the confirmed new value from the device and delivers it back to the Instance Actor as a standard value update.
- The Instance Actor's in-memory value is **not** updated until the device confirms the write.
## Value Update Message Format
Each value update delivered to an Instance Actor includes:
- **Tag path**: The relative path of the attribute's data source reference.
- **Value**: The new value from the device.
- **Quality**: Data quality indicator (good, bad, uncertain).
- **Timestamp**: When the value was read from the device.
## Connection Actor Model
Each data connection is managed by a dedicated connection actor that uses the Akka.NET **Become/Stash** pattern to model its lifecycle as a state machine:
- **Connecting**: The actor attempts to establish the connection. Subscription requests and write commands received during this phase are **stashed** (buffered in the actor's stash).
- **Connected**: The actor is actively servicing subscriptions. On entering this state, all stashed messages are unstashed and processed.
- **Reconnecting**: The connection was lost. The actor transitions back to a connecting-like state, stashing new requests while it retries.
This pattern ensures no messages are lost during connection transitions and is the standard Akka.NET approach for actors with I/O lifecycle dependencies.
**LmxProxy-specific notes**: The `RealLmxProxyClient` holds the `SessionId` returned by the `Connect` RPC and includes it in all subsequent operations. Subscriptions use server-streaming gRPC — a background task reads from the `ResponseStream` and invokes the callback for each `VtqMessage`. When the stream breaks (server offline, network failure), the background task detects the `RpcException` or stream end and invokes the `onStreamError` callback, which triggers the adapter's `Disconnected` event. The DCL actor transitions to **Reconnecting**, pushes bad quality, disposes the client, and retries at the fixed interval.
**OPC UA-specific notes**: The `RealOpcUaClient` uses the OPC Foundation SDK's `Session.KeepAlive` event for proactive disconnect detection. The SDK sends keep-alive requests at the subscription's `KeepAliveCount × PublishingInterval` (default: 10s). When keep-alive fails, the `ConnectionLost` event fires, triggering the same reconnection flow. On reconnection, the DCL re-creates the OPC UA session and subscription, then re-adds all monitored items.
## Connection Lifecycle & Reconnection
The DCL manages connection lifecycle automatically:
1. **Connection drop detection**: When a connection to a data source is lost, the DCL immediately pushes a value update with quality `bad` for **every tag subscribed on that connection**. Instance Actors and their downstream consumers (alarms, scripts checking quality) see the staleness immediately.
2. **Auto-reconnect with fixed interval**: The DCL retries the connection at a configurable fixed interval (e.g., every 5 seconds). The retry interval is defined **per data connection**. This is consistent with the fixed-interval retry philosophy used throughout the system. Individual gRPC/OPC UA operations (reads, writes) fail immediately to the caller on error; there is no operation-level retry within the adapter.
3. **Connection state transitions**: The DCL tracks each connection's state as `connected`, `disconnected`, or `reconnecting`. All transitions are logged to Site Event Logging.
4. **Transparent re-subscribe**: On successful reconnection, the DCL automatically re-establishes all previously active subscriptions for that connection. Instance Actors require no action — they simply see quality return to `good` as fresh values arrive from restored subscriptions.
### Disconnect Detection Pattern
Each adapter implements the `IDataConnection.Disconnected` event to proactively signal connection loss to the `DataConnectionActor`. Detection uses two complementary paths:
**Proactive detection** (server goes offline between operations):
- **OPC UA**: The OPC Foundation SDK fires `Session.KeepAlive` events at regular intervals. `RealOpcUaClient` hooks this event; when `ServiceResult.IsBad(e.Status)` (server unreachable, keep-alive timeout), it fires `ConnectionLost`. The `OpcUaDataConnection` adapter translates this into `IDataConnection.Disconnected`.
- **LmxProxy**: gRPC server-streaming subscriptions run in background tasks reading from `ResponseStream`. When the server goes offline, the stream either ends normally (server closed) or throws a non-cancellation `RpcException`. `RealLmxProxyClient` invokes the `onStreamError` callback, which `LmxProxyDataConnection` translates into `IDataConnection.Disconnected`.
**Reactive detection** (failure discovered during an operation):
- Both adapters wrap `ReadAsync` (and by extension `ReadBatchAsync`) with exception handling. If a read throws a non-cancellation exception, the adapter calls `RaiseDisconnected()` and re-throws. The `DataConnectionActor`'s existing error handling catches the exception while the disconnect event triggers the reconnection state machine.
**Event marshalling**: The `DataConnectionActor` subscribes to `_adapter.Disconnected` in `PreStart()`. Since `Disconnected` may fire from a background thread (gRPC stream task, OPC UA keep-alive timer), the handler sends an `AdapterDisconnected` message to `Self`, marshalling the notification onto the actor's message loop. This triggers `BecomeReconnecting()` → bad quality push → retry timer.
**Once-only guard**: Both `LmxProxyDataConnection` and `OpcUaDataConnection` use a `volatile bool _disconnectFired` flag to ensure `RaiseDisconnected()` fires exactly once per connection session. The flag resets on successful reconnection (`ConnectAsync`).
## Write Failure Handling
Writes to physical devices are **synchronous** from the script's perspective:
- If the write fails (connection down, device rejection, timeout), the error is **returned to the calling script**. Script authors can catch and handle write errors (log, notify, retry, etc.).
- Write failures are also logged to Site Event Logging.
- There is **no store-and-forward for device writes** — these are real-time control operations. Buffering stale setpoints for later application would be dangerous in an industrial context.
## Tag Path Resolution
When the DCL subscribes to a tag path from the flattened configuration but the path does not exist on the physical device (e.g., typo in the template, device firmware changed, device still booting):
1. The failure is **logged to Site Event Logging**.
2. The attribute is marked with quality `bad`.
3. The DCL **periodically retries resolution** at a configurable interval, accommodating devices that come online in stages or load modules after startup.
4. On successful resolution, the subscription activates normally and quality reflects the live value from the device.
Note: Pre-deployment validation at central does **not** verify that tag paths resolve to real tags on physical devices — that is a runtime concern handled here.
## Health Reporting
The DCL reports the following metrics to the Health Monitoring component via the existing periodic heartbeat:
- **Connection status**: `connected`, `disconnected`, or `reconnecting` per data connection.
- **Tag resolution counts**: Per connection, the number of total subscribed tags vs. successfully resolved tags. This gives operators visibility into misconfigured templates without needing to open the debug view for individual instances.
## Dependencies
- **Site Runtime (Instance Actors)**: Receives subscription registrations and delivers value updates. Receives write requests.
- **Health Monitoring**: Reports connection status.
- **Site Event Logging**: Logs connection status changes.
## Interactions
- **Site Runtime (Instance Actors)**: Bidirectional — delivers value updates, receives subscription registrations and write-back commands.
- **Health Monitoring**: Reports connection health periodically.
- **Site Event Logging**: Logs connection/disconnection events.