docs(components): reference docs batch 2/4 — TemplateEngine, DeploymentManager, SiteRuntime, DataConnectionLayer, StoreAndForward, ExternalSystemGateway

This commit is contained in:
Joseph Doherty
2026-06-03 15:47:16 -04:00
parent b89611464b
commit 8fb90ba400
6 changed files with 1644 additions and 0 deletions
+309
View File
@@ -0,0 +1,309 @@
# Data Connection Layer
The Data Connection Layer is the site-only clean data pipe that abstracts protocol-specific communication behind a uniform actor/interface model, delivering live tag values and native alarm transitions to Instance Actors while hiding all connection lifecycle complexity from the rest of the system.
## Overview
The Data Connection Layer (#4) runs exclusively on site nodes. Central never reads from or writes to physical devices directly. The component code lives in `src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/`, with adapters in `Adapters/` and the actor hierarchy in `Actors/`.
Key collaborators are:
- `DataConnectionManagerActor` — the site-level router. One actor per site, child of the Site Runtime hierarchy. It owns the `connectionName → IActorRef` map and routes every inbound message to the right `DataConnectionActor`.
- `DataConnectionActor` — one per configured data connection. Owns a single `IDataConnection` adapter and models the full connection lifecycle as a Become/Stash state machine.
- `IDataConnection` (Commons) — the protocol adapter contract. Implemented by `OpcUaDataConnection` and `MxGatewayDataConnection`.
- `DataConnectionFactory` / `IDataConnectionFactory` — resolves a case-insensitive `ProtocolType` string to a fresh `IDataConnection` instance.
DI registration is through `ServiceCollectionExtensions.AddDataConnectionLayer`, which binds `DataConnectionOptions` from the `DataConnectionLayer` section and `OpcUaGlobalOptions` from the `OpcUa` section. The actors themselves are created inside the ActorSystem, not by DI.
## Key Concepts
### Clean data pipe
The DCL performs no alarm evaluation, no trigger evaluation, and no business logic. Its only job is to subscribe to tag paths requested by Instance Actors, deliver `TagValueUpdate` messages when values change, and route write requests down to the device. Every value change crosses exactly one boundary: device → adapter callback → `DataConnectionActor``InstanceActor`. The DCL never publishes to any actor other than the Instance Actors that subscribed.
### Become/Stash lifecycle state machine
`DataConnectionActor` uses Akka.NET's `Become`/`Stash` pattern to model three states cleanly:
| State | Behaviour |
|-------|-----------|
| `Connecting` | Stashes `SubscribeTagsRequest`, `WriteTagRequest`, `UnsubscribeTagsRequest`, `SubscribeAlarmsRequest`. Non-blocking for `BrowseNodeCommand` and `ReadTagValuesCommand` (immediate failure reply). |
| `Connected` | Processes all message types. On entering, calls `Stash.UnstashAll()`. |
| `Reconnecting` | Stashes new subscribe/write requests; allows `UnsubscribeTagsRequest` and `UnsubscribeAlarmsRequest` through for cleanup on instance stop. |
`BecomeConnecting` fires immediately in `PreStart`. `BecomeConnected` also calls `Stash.UnstashAll()` so queued subscribe requests from instance startup race are processed without any loss. The `IWithStash` and `IWithTimers` interfaces are declared on the actor; both are injected by the Akka.NET infrastructure.
### Adapter generation guard
Each `DataConnectionActor` holds an `_adapterGeneration` integer. When a new `IDataConnection` is created on failover, the generation is incremented. All subscription callbacks capture the generation at the time they are registered; a `TagValueReceived` or `AlarmTransitionReceived` whose generation does not match `_adapterGeneration` is silently dropped. This ensures stale callbacks from a disposed OPC UA SDK session or a closed gRPC stream never reach Instance Actors.
### Protocol extensibility
Adding a new protocol requires implementing `IDataConnection` (and optionally `IBrowsableDataConnection` and/or `IAlarmSubscribableConnection`) and calling `DataConnectionFactory.RegisterAdapter` with the new `ProtocolType` string. The manager and the actor are protocol-agnostic — they always work through the interfaces.
## Architecture
### Actor hierarchy
```
DataConnectionManagerActor (one per site; child of DeploymentManagerActor)
├── DataConnectionActor "OpcSrv1" (one per configured connection)
├── DataConnectionActor "Gateway2"
└── ...
```
`DataConnectionManagerActor` is a `ReceiveActor`. It routes by connection name: `HandleRoute` calls `actor.Forward(request)` when the connection exists or sends a typed failure reply when it does not. The manager owns `ConnectionNotFound` failures; `DataConnectionActor` owns everything else (not connected, server errors, capability checks). The manager's `SupervisorStrategy` is `Resume` with up to ten restarts per minute, so a transient exception on a child never collapses the routing table.
`CreateConnectionCommand` (from the Site Runtime's `DeploymentManagerActor`) triggers `HandleCreateConnection`. The actor name is sanitised from the connection's display name to satisfy Akka's path character constraints.
### Connection state machine
```csharp
// DataConnectionActor.cs — PreStart and state transitions
protected override void PreStart()
{
_self = Self; // capture for off-thread use
_adapter.Disconnected += OnAdapterDisconnected;
BecomeConnecting();
}
private void OnAdapterDisconnected()
{
// fired from a background thread (gRPC stream, OPC UA keep-alive timer)
_self.Tell(new AdapterDisconnected()); // marshal onto actor loop
}
private void BecomeConnecting()
{
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
Become(Connecting);
Self.Tell(new AttemptConnect());
}
private void BecomeConnected()
{
_lastConnectedAt = DateTimeOffset.UtcNow;
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
Become(Connected);
Stash.UnstashAll();
}
private void BecomeReconnecting()
{
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
Become(Reconnecting);
PushBadQualityForAllTags(); // immediate bad quality — Instance Actors see it now
PushAlarmSourceUnavailable(); // native alarm subscribers notified
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
```
`AttemptConnect` fires `_adapter.ConnectAsync(...)` and pipes `ConnectResult` back to the actor via `PipeTo`. On success, `BecomeConnected` is called; on failure, the counter increments and the reconnect timer is re-armed.
### Transparent re-subscribe
`ReSubscribeAll` runs immediately after a successful reconnect, before `BecomeConnected`. It derives the tag list from `_subscriptionsByInstance` (the durable record of what every Instance Actor asked for) rather than from `_subscriptionIds` (which are adapter handles cleared on every disconnect). All in-flight and unresolved sets are cleared so the new adapter starts clean. Subscriptions are re-issued in parallel via `PipeTo`, so the actor is never blocked during re-subscribe.
```csharp
// DataConnectionActor.cs — HandleReconnectResult
if (result.Success)
{
_consecutiveFailures = 0;
ReSubscribeAll(); // tag subscriptions
ReSubscribeAllAlarms(); // native alarm feeds
BecomeConnected();
}
```
### Failover between primary and backup endpoints
When `BackupConnectionDetails` is provided, the actor tracks consecutive failures and unstable disconnects. After `FailoverRetryCount` consecutive failures (or unstable connections shorter than `StableConnectionThreshold`), the actor disposes the current adapter, creates a fresh one with the other endpoint's config via `DataConnectionFactory.Create`, and re-arms the connect timer. Failover is round-robin: primary → backup → primary. There is no auto-failback; the connection stays on whichever endpoint is currently working.
### Write path
Writes are fire-and-forget from the script's view only in the sense that there is no store-and-forward. The `DataConnectionActor` pipes `_adapter.WriteAsync` back to the original sender:
```csharp
// DataConnectionActor.cs — HandleWrite
private void HandleWrite(WriteTagRequest request)
{
var sender = Sender;
var cts = new CancellationTokenSource(_options.WriteTimeout);
_adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
{
cts.Dispose();
if (t.IsCompletedSuccessfully)
return new WriteTagResponse(request.CorrelationId, t.Result.Success,
t.Result.ErrorMessage, DateTimeOffset.UtcNow);
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
return new WriteTagResponse(request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
return new WriteTagResponse(request.CorrelationId, false,
t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
```
`WriteTagResponse` is returned synchronously to the calling script (via the Instance Actor, which awaits it). There is no store-and-forward for writes — buffering stale setpoints for later replay is unsafe in a control context.
### Tag path resolution retry
When `_adapter.SubscribeAsync` throws a resolution-level exception (node not found, device still booting), the tag is added to `_unresolvedTags` and marked `QualityCode.Bad`. A periodic `RetryTagResolution` timer fires at `TagResolutionRetryInterval`. Only tags that are not already in `_resolutionInFlight` are dispatched on each tick, preventing duplicate concurrent subscribe calls for a slow device. When resolution succeeds, the tag moves from `_unresolvedTags` to `_subscriptionIds`, `_resolvedTags` is incremented, and the timer is cancelled once the set is empty.
A separate in-flight set `_subscribesInFlight` tracks the initial `SubscribeAsync` for newly requested tags. Two `SubscribeTagsRequest` messages arriving for different instances that share a tag path both observe the tag as already handled, so only one adapter subscribe is issued.
### Tag subscriber reference counting
`_tagSubscriberCount` maps each tag path to the number of instances subscribed to it. When an instance unsubscribes, the count is decremented. The adapter `UnsubscribeAsync` call and the quality / resolution counters are updated only when the count reaches zero. This means a shared tag path (subscribed by two different instances bound to the same connection) remains active at the adapter until both instances stop.
## Usage
### Creating a connection
The Site Runtime's `DeploymentManagerActor` sends `CreateConnectionCommand` to `DataConnectionManagerActor` during instance deployment:
```csharp
// Commons/Messages/DataConnection/CreateConnectionCommand.cs
public record CreateConnectionCommand(
string ConnectionName,
string ProtocolType,
IDictionary<string, string> PrimaryConnectionDetails,
IDictionary<string, string>? BackupConnectionDetails = null,
int FailoverRetryCount = 3);
```
`DataConnectionManagerActor.HandleCreateConnection` calls `DataConnectionFactory.Create(ProtocolType, PrimaryConnectionDetails)` to produce the initial `IDataConnection` adapter and spawns a `DataConnectionActor` child.
### Subscribing and receiving values
Instance Actors send `SubscribeTagsRequest` and receive `TagValueUpdate` messages. The actor sends `TagValueUpdate` to the Instance Actor's ref for every value change notification from the adapter. On disconnect, `ConnectionQualityChanged` (with `QualityCode.Bad`) is sent to all subscribers so Instance Actors reflect staleness immediately without waiting for a per-tag callback.
### Browse (design-time only)
`BrowseNodeCommand` is routed to the named `DataConnectionActor`. If the adapter implements `IBrowsableDataConnection`, the actor calls `BrowseChildrenAsync` and pipes `BrowseNodeResult` back. The result is capped to ~100 KB before reply to stay within the Akka remote frame budget on the site→central crossing. Browse is never called on the hot path and Instance Actors never use it.
### Native alarm subscription
`NativeAlarmActor` (Site Runtime) sends `SubscribeAlarmsRequest` to the `DataConnectionManagerActor`, which forwards it to the named `DataConnectionActor`:
```csharp
// DataConnectionActor.cs — HandleSubscribeAlarms (key excerpt)
if (_adapter is not IAlarmSubscribableConnection alarmable)
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, false,
$"Connection '{_connectionName}' is not alarm-capable.", now));
return;
}
// register the subscriber for routing before issuing the adapter call
_alarmSourceSubscribers[request.SourceReference].Add(subscriber);
// open one feed per source reference; subsequent subscribers reuse it
alarmable.SubscribeAlarmsAsync(sourceRef, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => ...)
.PipeTo(self);
```
Incoming `AlarmTransitionReceived` is routed to all subscribers whose registered `SourceReference` is a prefix of the transition's `SourceObjectReference` or `SourceReference`. On disconnect, `PushAlarmSourceUnavailable` sends `NativeAlarmSourceUnavailable` to every alarm subscriber; on reconnect, `ReSubscribeAllAlarms` re-opens the feeds and the adapter replays a snapshot of currently-active conditions.
## Configuration
All settings under `DataConnectionLayer` in `appsettings.json` bind to `DataConnectionOptions`. Global protocol settings (`OpcUa`, `MxGateway` sections) bind to `OpcUaGlobalOptions` and `MxGatewayGlobalOptions`.
### Shared settings (`DataConnectionLayer` section)
| Key | Default | Description |
|-----|---------|-------------|
| `ReconnectInterval` | `00:00:05` | Fixed interval between reconnection attempts after a disconnect or failed connect. |
| `TagResolutionRetryInterval` | `00:00:10` | Retry interval for tag paths that could not be resolved on the device. |
| `WriteTimeout` | `00:00:30` | Timeout applied to each `WriteAsync` call. A hung write times out and returns an error to the calling script. |
| `StableConnectionThreshold` | `00:01:00` | A connection that drops before this duration is counted as an unstable disconnect toward `FailoverRetryCount`. |
### OPC UA global settings (`OpcUa` section)
| Key | Default | Description |
|-----|---------|-------------|
| `ApplicationName` | `ScadaBridge-DCL` | Application name used in the OPC UA certificate and session negotiation. |
| `TrustedIssuerStorePath` | `""` | File path to the trusted issuer certificate store. |
| `TrustedPeerStorePath` | `""` | File path to the trusted peer certificate store. |
| `RejectedCertificateStorePath` | `""` | File path to the rejected certificate store. |
Empty store paths fall back to a temp-path default so dev runs work without explicit configuration.
### Per-connection settings (OPC UA)
Stored in the `DataConnection.PrimaryConfiguration` JSON dict, parsed by `OpcUaEndpointConfigSerializer.FromFlatDict`.
| Key | Default | Description |
|-----|---------|-------------|
| `endpoint` / `EndpointUrl` | `opc.tcp://localhost:4840` | OPC UA server endpoint URL. |
| `SessionTimeoutMs` | `60000` | Session timeout in ms. |
| `OperationTimeoutMs` | `15000` | Transport operation timeout in ms. |
| `PublishingIntervalMs` | `1000` | Subscription publishing interval in ms. |
| `KeepAliveCount` | `10` | Keep-alive frames before session timeout. |
| `LifetimeCount` | `30` | Subscription lifetime in publish intervals. |
| `SamplingIntervalMs` | `1000` | Per-item server sampling rate in ms. |
| `QueueSize` | `10` | Per-item notification buffer size. |
| `SecurityMode` | `None` | `None`, `Sign`, or `SignAndEncrypt`. |
| `AutoAcceptUntrustedCerts` | `true` | Accept untrusted server certificates. |
### Per-connection settings (MxGateway)
Stored in `DataConnection.PrimaryConfiguration`, parsed by `MxGatewayEndpointConfigSerializer.FromFlatDict`.
| Key | Default | Description |
|-----|---------|-------------|
| `Endpoint` | `http://localhost:5000` | Gateway base URL. |
| `ApiKey` | — | Sent as `Authorization: Bearer <key>`. Redacted in logs. |
| `ClientName` | `scadabridge` | MXAccess client registration name. |
| `WriteUserId` | `0` | MXAccess user id on writes. `0` = no user context. |
| `UseTls` | `false` | Use TLS. |
| `CaFile` | — | CA certificate file path (TLS only). |
| `ServerName` | — | TLS server-name override. |
| `ReadTimeoutMs` | `5000` | `ReadBulk` per-call timeout in ms. |
### Per-connection failover settings
Carried on `CreateConnectionCommand` from the deployment artifact.
| Field | Default | Description |
|-------|---------|-------------|
| `BackupConnectionDetails` | `null` | If absent, the connection retries its single endpoint indefinitely. |
| `FailoverRetryCount` | `3` | Consecutive failures (or unstable disconnects) before switching endpoints. |
## Dependencies & Interactions
- [Site Runtime (#3)](./SiteRuntime.md) — `DeploymentManagerActor` sends `CreateConnectionCommand` / `RemoveConnectionCommand` to create and tear down connections during instance deployment. Instance Actors send `SubscribeTagsRequest` / `UnsubscribeTagsRequest` / `WriteTagRequest`. `NativeAlarmActor` (peer to `AlarmActor` under `InstanceActor`) sends `SubscribeAlarmsRequest` / `UnsubscribeAlarmsRequest` and receives `NativeAlarmTransitionUpdate` / `NativeAlarmSourceUnavailable`.
- [Commons (#16)](./Commons.md) — owns `IDataConnection`, `IAlarmSubscribableConnection`, `IBrowsableDataConnection` interfaces; `TagValue`, `QualityCode`, `NativeAlarmTransition`, `AlarmConditionState`, `AlarmTransitionKind` types; and all message contracts (`SubscribeTagsRequest/Response`, `TagValueUpdate`, `WriteTagRequest/Response`, `SubscribeAlarmsRequest/Response`, `NativeAlarmTransitionUpdate`, `NativeAlarmSourceUnavailable`, `CreateConnectionCommand`, `BrowseNodeCommand/Result`, `DataConnectionHealthReport`).
- [Health Monitoring (#11)](./HealthMonitoring.md) — `DataConnectionActor` calls `ISiteHealthCollector.UpdateConnectionHealth`, `UpdateTagResolution`, `UpdateTagQuality`, `UpdateConnectionEndpoint`, and `RemoveConnection` to keep the site health report current. `DataConnectionManagerActor` handles `GetAllHealthReports` by forwarding `GetHealthReport` to each child.
- [Site Event Logging (#12)](./SiteEventLogging.md) — `DataConnectionActor` logs connection lost, restored, and failover events via `ISiteEventLogger.LogEventAsync` (fire-and-forget). Absent in tests where `ISiteEventLogger` is null.
- Design spec: [Component-DataConnectionLayer.md](../requirements/Component-DataConnectionLayer.md).
## Troubleshooting
### Tags stuck in bad quality after reconnect
After a reconnect, `ReSubscribeAll` re-issues all subscriptions. Tags that resolve successfully immediately receive a seeded value from `ReadAsync` before the subscription's first change notification arrives, so the Instance Actor sees a good-quality value promptly. Tags that fail resolution land in `_unresolvedTags` and are retried at `TagResolutionRetryInterval`. If a tag is persistently bad, check whether the tag path exists on the device and whether the device is online. The health report's `ResolvedTags` vs `TotalSubscribedTags` counters expose the gap without requiring the debug view.
### Connection not failing over to backup
The failover counter uses two independent paths: `_consecutiveFailures` for connect-attempt failures, and `_consecutiveUnstableDisconnects` for connections that drop before `StableConnectionThreshold`. Both must reach `FailoverRetryCount` to trigger a switch. A connection that succeeds but immediately drops does not increment `_consecutiveFailures` — it increments `_consecutiveUnstableDisconnects` instead. Verify both counters are relevant to the observed failure pattern. Failover events are written to Site Event Logging as `Warning` entries.
### Browse hangs on "loading…"
The `DataConnectionActor` caps `BrowseNodeResult` to ~100 KB before returning it. If the picker hangs rather than showing a "results truncated" hint, the reply may have been silently discarded by Akka remoting before reaching central (the reply crosses the site→central frame). Check whether the connection actor is in `Connecting` or `Reconnecting` state — browse while disconnected returns `BrowseFailureKind.ConnectionNotConnected` immediately rather than hanging.
### Alarm feed not receiving transitions
Confirm that the connection's adapter implements `IAlarmSubscribableConnection` (currently `OpcUaDataConnection` and `MxGatewayDataConnection`). A non-capable adapter returns `SubscribeAlarmsResponse(Success = false)`. If the capability is present, check whether the connection is in `Connected` state — `SubscribeAlarmsRequest` is stashed while `Connecting` or `Reconnecting` and processed on entering `Connected`. On reconnect, `ReSubscribeAllAlarms` re-opens the feeds and the adapter replays a snapshot.
## Related Documentation
- [Data Connection Layer design specification](../requirements/Component-DataConnectionLayer.md)
- [Site Runtime](./SiteRuntime.md)
- [Commons](./Commons.md)
- [Health Monitoring](./HealthMonitoring.md)
- [Site Event Logging](./SiteEventLogging.md)
- [CentralSite Communication](./Communication.md)