Files
ScadaBridge/docs/components/DataConnectionLayer.md
T
Joseph Doherty 66f0f96328 docs(components): verification pass — fix cross-link targets, tag code fences, correct type names
- Fix 15 link-text/target mismatches (ConfigurationDatabase ×8 to Commons,
  NotificationOutbox ×4, ClusterInfrastructure case, HealthMonitoring,
  SiteCallAudit) caught by a link-text-vs-target consistency check.
- Tag 14 untagged code-fence openers (ASCII diagrams/trees, JSON, HTTP).
- Correct 4 type names to match source (ValidationService, HealthReportSender,
  CentralCommunicationActor, DebugSnapshotCommand set).
- Soften Traefik version prose per the style guide.
2026-06-03 16:09:06 -04:00

310 lines
21 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Data Connection Layer
The Data Connection Layer is the site-only clean data pipe that abstracts protocol-specific communication behind a uniform actor/interface model, delivering live tag values and native alarm transitions to Instance Actors while hiding all connection lifecycle complexity from the rest of the system.
## Overview
The Data Connection Layer (#4) runs exclusively on site nodes. Central never reads from or writes to physical devices directly. The component code lives in `src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/`, with adapters in `Adapters/` and the actor hierarchy in `Actors/`.
Key collaborators are:
- `DataConnectionManagerActor` — the site-level router. One actor per site, child of the Site Runtime hierarchy. It owns the `connectionName → IActorRef` map and routes every inbound message to the right `DataConnectionActor`.
- `DataConnectionActor` — one per configured data connection. Owns a single `IDataConnection` adapter and models the full connection lifecycle as a Become/Stash state machine.
- `IDataConnection` (Commons) — the protocol adapter contract. Implemented by `OpcUaDataConnection` and `MxGatewayDataConnection`.
- `DataConnectionFactory` / `IDataConnectionFactory` — resolves a case-insensitive `ProtocolType` string to a fresh `IDataConnection` instance.
DI registration is through `ServiceCollectionExtensions.AddDataConnectionLayer`, which binds `DataConnectionOptions` from the `DataConnectionLayer` section and `OpcUaGlobalOptions` from the `OpcUa` section. The actors themselves are created inside the ActorSystem, not by DI.
## Key Concepts
### Clean data pipe
The DCL performs no alarm evaluation, no trigger evaluation, and no business logic. Its only job is to subscribe to tag paths requested by Instance Actors, deliver `TagValueUpdate` messages when values change, and route write requests down to the device. Every value change crosses exactly one boundary: device → adapter callback → `DataConnectionActor``InstanceActor`. The DCL never publishes to any actor other than the Instance Actors that subscribed.
### Become/Stash lifecycle state machine
`DataConnectionActor` uses Akka.NET's `Become`/`Stash` pattern to model three states cleanly:
| State | Behaviour |
|-------|-----------|
| `Connecting` | Stashes `SubscribeTagsRequest`, `WriteTagRequest`, `UnsubscribeTagsRequest`, `SubscribeAlarmsRequest`. Non-blocking for `BrowseNodeCommand` and `ReadTagValuesCommand` (immediate failure reply). |
| `Connected` | Processes all message types. On entering, calls `Stash.UnstashAll()`. |
| `Reconnecting` | Stashes new subscribe/write requests; allows `UnsubscribeTagsRequest` and `UnsubscribeAlarmsRequest` through for cleanup on instance stop. |
`BecomeConnecting` fires immediately in `PreStart`. `BecomeConnected` also calls `Stash.UnstashAll()` so queued subscribe requests from instance startup race are processed without any loss. The `IWithStash` and `IWithTimers` interfaces are declared on the actor; both are injected by the Akka.NET infrastructure.
### Adapter generation guard
Each `DataConnectionActor` holds an `_adapterGeneration` integer. When a new `IDataConnection` is created on failover, the generation is incremented. All subscription callbacks capture the generation at the time they are registered; a `TagValueReceived` or `AlarmTransitionReceived` whose generation does not match `_adapterGeneration` is silently dropped. This ensures stale callbacks from a disposed OPC UA SDK session or a closed gRPC stream never reach Instance Actors.
### Protocol extensibility
Adding a new protocol requires implementing `IDataConnection` (and optionally `IBrowsableDataConnection` and/or `IAlarmSubscribableConnection`) and calling `DataConnectionFactory.RegisterAdapter` with the new `ProtocolType` string. The manager and the actor are protocol-agnostic — they always work through the interfaces.
## Architecture
### Actor hierarchy
```text
DataConnectionManagerActor (one per site; child of DeploymentManagerActor)
├── DataConnectionActor "OpcSrv1" (one per configured connection)
├── DataConnectionActor "Gateway2"
└── ...
```
`DataConnectionManagerActor` is a `ReceiveActor`. It routes by connection name: `HandleRoute` calls `actor.Forward(request)` when the connection exists or sends a typed failure reply when it does not. The manager owns `ConnectionNotFound` failures; `DataConnectionActor` owns everything else (not connected, server errors, capability checks). The manager's `SupervisorStrategy` is `Resume` with up to ten restarts per minute, so a transient exception on a child never collapses the routing table.
`CreateConnectionCommand` (from the Site Runtime's `DeploymentManagerActor`) triggers `HandleCreateConnection`. The actor name is sanitised from the connection's display name to satisfy Akka's path character constraints.
### Connection state machine
```csharp
// DataConnectionActor.cs — PreStart and state transitions
protected override void PreStart()
{
_self = Self; // capture for off-thread use
_adapter.Disconnected += OnAdapterDisconnected;
BecomeConnecting();
}
private void OnAdapterDisconnected()
{
// fired from a background thread (gRPC stream, OPC UA keep-alive timer)
_self.Tell(new AdapterDisconnected()); // marshal onto actor loop
}
private void BecomeConnecting()
{
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
Become(Connecting);
Self.Tell(new AttemptConnect());
}
private void BecomeConnected()
{
_lastConnectedAt = DateTimeOffset.UtcNow;
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
Become(Connected);
Stash.UnstashAll();
}
private void BecomeReconnecting()
{
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
Become(Reconnecting);
PushBadQualityForAllTags(); // immediate bad quality — Instance Actors see it now
PushAlarmSourceUnavailable(); // native alarm subscribers notified
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
```
`AttemptConnect` fires `_adapter.ConnectAsync(...)` and pipes `ConnectResult` back to the actor via `PipeTo`. On success, `BecomeConnected` is called; on failure, the counter increments and the reconnect timer is re-armed.
### Transparent re-subscribe
`ReSubscribeAll` runs immediately after a successful reconnect, before `BecomeConnected`. It derives the tag list from `_subscriptionsByInstance` (the durable record of what every Instance Actor asked for) rather than from `_subscriptionIds` (which are adapter handles cleared on every disconnect). All in-flight and unresolved sets are cleared so the new adapter starts clean. Subscriptions are re-issued in parallel via `PipeTo`, so the actor is never blocked during re-subscribe.
```csharp
// DataConnectionActor.cs — HandleReconnectResult
if (result.Success)
{
_consecutiveFailures = 0;
ReSubscribeAll(); // tag subscriptions
ReSubscribeAllAlarms(); // native alarm feeds
BecomeConnected();
}
```
### Failover between primary and backup endpoints
When `BackupConnectionDetails` is provided, the actor tracks consecutive failures and unstable disconnects. After `FailoverRetryCount` consecutive failures (or unstable connections shorter than `StableConnectionThreshold`), the actor disposes the current adapter, creates a fresh one with the other endpoint's config via `DataConnectionFactory.Create`, and re-arms the connect timer. Failover is round-robin: primary → backup → primary. There is no auto-failback; the connection stays on whichever endpoint is currently working.
### Write path
Writes are fire-and-forget from the script's view only in the sense that there is no store-and-forward. The `DataConnectionActor` pipes `_adapter.WriteAsync` back to the original sender:
```csharp
// DataConnectionActor.cs — HandleWrite
private void HandleWrite(WriteTagRequest request)
{
var sender = Sender;
var cts = new CancellationTokenSource(_options.WriteTimeout);
_adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
{
cts.Dispose();
if (t.IsCompletedSuccessfully)
return new WriteTagResponse(request.CorrelationId, t.Result.Success,
t.Result.ErrorMessage, DateTimeOffset.UtcNow);
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
return new WriteTagResponse(request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
return new WriteTagResponse(request.CorrelationId, false,
t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
```
`WriteTagResponse` is returned synchronously to the calling script (via the Instance Actor, which awaits it). There is no store-and-forward for writes — buffering stale setpoints for later replay is unsafe in a control context.
### Tag path resolution retry
When `_adapter.SubscribeAsync` throws a resolution-level exception (node not found, device still booting), the tag is added to `_unresolvedTags` and marked `QualityCode.Bad`. A periodic `RetryTagResolution` timer fires at `TagResolutionRetryInterval`. Only tags that are not already in `_resolutionInFlight` are dispatched on each tick, preventing duplicate concurrent subscribe calls for a slow device. When resolution succeeds, the tag moves from `_unresolvedTags` to `_subscriptionIds`, `_resolvedTags` is incremented, and the timer is cancelled once the set is empty.
A separate in-flight set `_subscribesInFlight` tracks the initial `SubscribeAsync` for newly requested tags. Two `SubscribeTagsRequest` messages arriving for different instances that share a tag path both observe the tag as already handled, so only one adapter subscribe is issued.
### Tag subscriber reference counting
`_tagSubscriberCount` maps each tag path to the number of instances subscribed to it. When an instance unsubscribes, the count is decremented. The adapter `UnsubscribeAsync` call and the quality / resolution counters are updated only when the count reaches zero. This means a shared tag path (subscribed by two different instances bound to the same connection) remains active at the adapter until both instances stop.
## Usage
### Creating a connection
The Site Runtime's `DeploymentManagerActor` sends `CreateConnectionCommand` to `DataConnectionManagerActor` during instance deployment:
```csharp
// Commons/Messages/DataConnection/CreateConnectionCommand.cs
public record CreateConnectionCommand(
string ConnectionName,
string ProtocolType,
IDictionary<string, string> PrimaryConnectionDetails,
IDictionary<string, string>? BackupConnectionDetails = null,
int FailoverRetryCount = 3);
```
`DataConnectionManagerActor.HandleCreateConnection` calls `DataConnectionFactory.Create(ProtocolType, PrimaryConnectionDetails)` to produce the initial `IDataConnection` adapter and spawns a `DataConnectionActor` child.
### Subscribing and receiving values
Instance Actors send `SubscribeTagsRequest` and receive `TagValueUpdate` messages. The actor sends `TagValueUpdate` to the Instance Actor's ref for every value change notification from the adapter. On disconnect, `ConnectionQualityChanged` (with `QualityCode.Bad`) is sent to all subscribers so Instance Actors reflect staleness immediately without waiting for a per-tag callback.
### Browse (design-time only)
`BrowseNodeCommand` is routed to the named `DataConnectionActor`. If the adapter implements `IBrowsableDataConnection`, the actor calls `BrowseChildrenAsync` and pipes `BrowseNodeResult` back. The result is capped to ~100 KB before reply to stay within the Akka remote frame budget on the site→central crossing. Browse is never called on the hot path and Instance Actors never use it.
### Native alarm subscription
`NativeAlarmActor` (Site Runtime) sends `SubscribeAlarmsRequest` to the `DataConnectionManagerActor`, which forwards it to the named `DataConnectionActor`:
```csharp
// DataConnectionActor.cs — HandleSubscribeAlarms (key excerpt)
if (_adapter is not IAlarmSubscribableConnection alarmable)
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, false,
$"Connection '{_connectionName}' is not alarm-capable.", now));
return;
}
// register the subscriber for routing before issuing the adapter call
_alarmSourceSubscribers[request.SourceReference].Add(subscriber);
// open one feed per source reference; subsequent subscribers reuse it
alarmable.SubscribeAlarmsAsync(sourceRef, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => ...)
.PipeTo(self);
```
Incoming `AlarmTransitionReceived` is routed to all subscribers whose registered `SourceReference` is a prefix of the transition's `SourceObjectReference` or `SourceReference`. On disconnect, `PushAlarmSourceUnavailable` sends `NativeAlarmSourceUnavailable` to every alarm subscriber; on reconnect, `ReSubscribeAllAlarms` re-opens the feeds and the adapter replays a snapshot of currently-active conditions.
## Configuration
All settings under `DataConnectionLayer` in `appsettings.json` bind to `DataConnectionOptions`. Global protocol settings (`OpcUa`, `MxGateway` sections) bind to `OpcUaGlobalOptions` and `MxGatewayGlobalOptions`.
### Shared settings (`DataConnectionLayer` section)
| Key | Default | Description |
|-----|---------|-------------|
| `ReconnectInterval` | `00:00:05` | Fixed interval between reconnection attempts after a disconnect or failed connect. |
| `TagResolutionRetryInterval` | `00:00:10` | Retry interval for tag paths that could not be resolved on the device. |
| `WriteTimeout` | `00:00:30` | Timeout applied to each `WriteAsync` call. A hung write times out and returns an error to the calling script. |
| `StableConnectionThreshold` | `00:01:00` | A connection that drops before this duration is counted as an unstable disconnect toward `FailoverRetryCount`. |
### OPC UA global settings (`OpcUa` section)
| Key | Default | Description |
|-----|---------|-------------|
| `ApplicationName` | `ScadaBridge-DCL` | Application name used in the OPC UA certificate and session negotiation. |
| `TrustedIssuerStorePath` | `""` | File path to the trusted issuer certificate store. |
| `TrustedPeerStorePath` | `""` | File path to the trusted peer certificate store. |
| `RejectedCertificateStorePath` | `""` | File path to the rejected certificate store. |
Empty store paths fall back to a temp-path default so dev runs work without explicit configuration.
### Per-connection settings (OPC UA)
Stored in the `DataConnection.PrimaryConfiguration` JSON dict, parsed by `OpcUaEndpointConfigSerializer.FromFlatDict`.
| Key | Default | Description |
|-----|---------|-------------|
| `endpoint` / `EndpointUrl` | `opc.tcp://localhost:4840` | OPC UA server endpoint URL. |
| `SessionTimeoutMs` | `60000` | Session timeout in ms. |
| `OperationTimeoutMs` | `15000` | Transport operation timeout in ms. |
| `PublishingIntervalMs` | `1000` | Subscription publishing interval in ms. |
| `KeepAliveCount` | `10` | Keep-alive frames before session timeout. |
| `LifetimeCount` | `30` | Subscription lifetime in publish intervals. |
| `SamplingIntervalMs` | `1000` | Per-item server sampling rate in ms. |
| `QueueSize` | `10` | Per-item notification buffer size. |
| `SecurityMode` | `None` | `None`, `Sign`, or `SignAndEncrypt`. |
| `AutoAcceptUntrustedCerts` | `true` | Accept untrusted server certificates. |
### Per-connection settings (MxGateway)
Stored in `DataConnection.PrimaryConfiguration`, parsed by `MxGatewayEndpointConfigSerializer.FromFlatDict`.
| Key | Default | Description |
|-----|---------|-------------|
| `Endpoint` | `http://localhost:5000` | Gateway base URL. |
| `ApiKey` | — | Sent as `Authorization: Bearer <key>`. Redacted in logs. |
| `ClientName` | `scadabridge` | MXAccess client registration name. |
| `WriteUserId` | `0` | MXAccess user id on writes. `0` = no user context. |
| `UseTls` | `false` | Use TLS. |
| `CaFile` | — | CA certificate file path (TLS only). |
| `ServerName` | — | TLS server-name override. |
| `ReadTimeoutMs` | `5000` | `ReadBulk` per-call timeout in ms. |
### Per-connection failover settings
Carried on `CreateConnectionCommand` from the deployment artifact.
| Field | Default | Description |
|-------|---------|-------------|
| `BackupConnectionDetails` | `null` | If absent, the connection retries its single endpoint indefinitely. |
| `FailoverRetryCount` | `3` | Consecutive failures (or unstable disconnects) before switching endpoints. |
## Dependencies & Interactions
- [Site Runtime (#3)](./SiteRuntime.md) — `DeploymentManagerActor` sends `CreateConnectionCommand` / `RemoveConnectionCommand` to create and tear down connections during instance deployment. Instance Actors send `SubscribeTagsRequest` / `UnsubscribeTagsRequest` / `WriteTagRequest`. `NativeAlarmActor` (peer to `AlarmActor` under `InstanceActor`) sends `SubscribeAlarmsRequest` / `UnsubscribeAlarmsRequest` and receives `NativeAlarmTransitionUpdate` / `NativeAlarmSourceUnavailable`.
- [Commons (#16)](./Commons.md) — owns `IDataConnection`, `IAlarmSubscribableConnection`, `IBrowsableDataConnection` interfaces; `TagValue`, `QualityCode`, `NativeAlarmTransition`, `AlarmConditionState`, `AlarmTransitionKind` types; and all message contracts (`SubscribeTagsRequest/Response`, `TagValueUpdate`, `WriteTagRequest/Response`, `SubscribeAlarmsRequest/Response`, `NativeAlarmTransitionUpdate`, `NativeAlarmSourceUnavailable`, `CreateConnectionCommand`, `BrowseNodeCommand/Result`, `DataConnectionHealthReport`).
- [Health Monitoring (#11)](./HealthMonitoring.md) — `DataConnectionActor` calls `ISiteHealthCollector.UpdateConnectionHealth`, `UpdateTagResolution`, `UpdateTagQuality`, `UpdateConnectionEndpoint`, and `RemoveConnection` to keep the site health report current. `DataConnectionManagerActor` handles `GetAllHealthReports` by forwarding `GetHealthReport` to each child.
- [Site Event Logging (#12)](./SiteEventLogging.md) — `DataConnectionActor` logs connection lost, restored, and failover events via `ISiteEventLogger.LogEventAsync` (fire-and-forget). Absent in tests where `ISiteEventLogger` is null.
- Design spec: [Component-DataConnectionLayer.md](../requirements/Component-DataConnectionLayer.md).
## Troubleshooting
### Tags stuck in bad quality after reconnect
After a reconnect, `ReSubscribeAll` re-issues all subscriptions. Tags that resolve successfully immediately receive a seeded value from `ReadAsync` before the subscription's first change notification arrives, so the Instance Actor sees a good-quality value promptly. Tags that fail resolution land in `_unresolvedTags` and are retried at `TagResolutionRetryInterval`. If a tag is persistently bad, check whether the tag path exists on the device and whether the device is online. The health report's `ResolvedTags` vs `TotalSubscribedTags` counters expose the gap without requiring the debug view.
### Connection not failing over to backup
The failover counter uses two independent paths: `_consecutiveFailures` for connect-attempt failures, and `_consecutiveUnstableDisconnects` for connections that drop before `StableConnectionThreshold`. Both must reach `FailoverRetryCount` to trigger a switch. A connection that succeeds but immediately drops does not increment `_consecutiveFailures` — it increments `_consecutiveUnstableDisconnects` instead. Verify both counters are relevant to the observed failure pattern. Failover events are written to Site Event Logging as `Warning` entries.
### Browse hangs on "loading…"
The `DataConnectionActor` caps `BrowseNodeResult` to ~100 KB before returning it. If the picker hangs rather than showing a "results truncated" hint, the reply may have been silently discarded by Akka remoting before reaching central (the reply crosses the site→central frame). Check whether the connection actor is in `Connecting` or `Reconnecting` state — browse while disconnected returns `BrowseFailureKind.ConnectionNotConnected` immediately rather than hanging.
### Alarm feed not receiving transitions
Confirm that the connection's adapter implements `IAlarmSubscribableConnection` (currently `OpcUaDataConnection` and `MxGatewayDataConnection`). A non-capable adapter returns `SubscribeAlarmsResponse(Success = false)`. If the capability is present, check whether the connection is in `Connected` state — `SubscribeAlarmsRequest` is stashed while `Connecting` or `Reconnecting` and processed on entering `Connected`. On reconnect, `ReSubscribeAllAlarms` re-opens the feeds and the adapter replays a snapshot.
## Related Documentation
- [Data Connection Layer design specification](../requirements/Component-DataConnectionLayer.md)
- [Site Runtime](./SiteRuntime.md)
- [Commons](./Commons.md)
- [Health Monitoring](./HealthMonitoring.md)
- [Site Event Logging](./SiteEventLogging.md)
- [CentralSite Communication](./Communication.md)