# Data Connection Layer The Data Connection Layer is the site-only clean data pipe that abstracts protocol-specific communication behind a uniform actor/interface model, delivering live tag values and native alarm transitions to Instance Actors while hiding all connection lifecycle complexity from the rest of the system. ## Overview The Data Connection Layer (#4) runs exclusively on site nodes. Central never reads from or writes to physical devices directly. The component code lives in `src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/`, with adapters in `Adapters/` and the actor hierarchy in `Actors/`. Key collaborators are: - `DataConnectionManagerActor` — the site-level router. One actor per site, child of the Site Runtime hierarchy. It owns the `connectionName → IActorRef` map and routes every inbound message to the right `DataConnectionActor`. - `DataConnectionActor` — one per configured data connection. Owns a single `IDataConnection` adapter and models the full connection lifecycle as a Become/Stash state machine. - `IDataConnection` (Commons) — the protocol adapter contract. Implemented by `OpcUaDataConnection` and `MxGatewayDataConnection`. - `DataConnectionFactory` / `IDataConnectionFactory` — resolves a case-insensitive `ProtocolType` string to a fresh `IDataConnection` instance. DI registration is through `ServiceCollectionExtensions.AddDataConnectionLayer`, which binds `DataConnectionOptions` from the `DataConnectionLayer` section and `OpcUaGlobalOptions` from the `OpcUa` section. The actors themselves are created inside the ActorSystem, not by DI. ## Key Concepts ### Clean data pipe The DCL performs no alarm evaluation, no trigger evaluation, and no business logic. Its only job is to subscribe to tag paths requested by Instance Actors, deliver `TagValueUpdate` messages when values change, and route write requests down to the device. Every value change crosses exactly one boundary: device → adapter callback → `DataConnectionActor` → `InstanceActor`. The DCL never publishes to any actor other than the Instance Actors that subscribed. ### Become/Stash lifecycle state machine `DataConnectionActor` uses Akka.NET's `Become`/`Stash` pattern to model three states cleanly: | State | Behaviour | |-------|-----------| | `Connecting` | Stashes `SubscribeTagsRequest`, `WriteTagRequest`, `UnsubscribeTagsRequest`, `SubscribeAlarmsRequest`. Non-blocking for `BrowseNodeCommand` and `ReadTagValuesCommand` (immediate failure reply). | | `Connected` | Processes all message types. On entering, calls `Stash.UnstashAll()`. | | `Reconnecting` | Stashes new subscribe/write requests; allows `UnsubscribeTagsRequest` and `UnsubscribeAlarmsRequest` through for cleanup on instance stop. | `BecomeConnecting` fires immediately in `PreStart`. `BecomeConnected` also calls `Stash.UnstashAll()` so queued subscribe requests from instance startup race are processed without any loss. The `IWithStash` and `IWithTimers` interfaces are declared on the actor; both are injected by the Akka.NET infrastructure. ### Adapter generation guard Each `DataConnectionActor` holds an `_adapterGeneration` integer. When a new `IDataConnection` is created on failover, the generation is incremented. All subscription callbacks capture the generation at the time they are registered; a `TagValueReceived` or `AlarmTransitionReceived` whose generation does not match `_adapterGeneration` is silently dropped. This ensures stale callbacks from a disposed OPC UA SDK session or a closed gRPC stream never reach Instance Actors. ### Protocol extensibility Adding a new protocol requires implementing `IDataConnection` (and optionally `IBrowsableDataConnection` and/or `IAlarmSubscribableConnection`) and calling `DataConnectionFactory.RegisterAdapter` with the new `ProtocolType` string. The manager and the actor are protocol-agnostic — they always work through the interfaces. ## Architecture ### Actor hierarchy ``` DataConnectionManagerActor (one per site; child of DeploymentManagerActor) ├── DataConnectionActor "OpcSrv1" (one per configured connection) ├── DataConnectionActor "Gateway2" └── ... ``` `DataConnectionManagerActor` is a `ReceiveActor`. It routes by connection name: `HandleRoute` calls `actor.Forward(request)` when the connection exists or sends a typed failure reply when it does not. The manager owns `ConnectionNotFound` failures; `DataConnectionActor` owns everything else (not connected, server errors, capability checks). The manager's `SupervisorStrategy` is `Resume` with up to ten restarts per minute, so a transient exception on a child never collapses the routing table. `CreateConnectionCommand` (from the Site Runtime's `DeploymentManagerActor`) triggers `HandleCreateConnection`. The actor name is sanitised from the connection's display name to satisfy Akka's path character constraints. ### Connection state machine ```csharp // DataConnectionActor.cs — PreStart and state transitions protected override void PreStart() { _self = Self; // capture for off-thread use _adapter.Disconnected += OnAdapterDisconnected; BecomeConnecting(); } private void OnAdapterDisconnected() { // fired from a background thread (gRPC stream, OPC UA keep-alive timer) _self.Tell(new AdapterDisconnected()); // marshal onto actor loop } private void BecomeConnecting() { _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting); Become(Connecting); Self.Tell(new AttemptConnect()); } private void BecomeConnected() { _lastConnectedAt = DateTimeOffset.UtcNow; _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected); Become(Connected); Stash.UnstashAll(); } private void BecomeReconnecting() { _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected); Become(Reconnecting); PushBadQualityForAllTags(); // immediate bad quality — Instance Actors see it now PushAlarmSourceUnavailable(); // native alarm subscribers notified Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval); } ``` `AttemptConnect` fires `_adapter.ConnectAsync(...)` and pipes `ConnectResult` back to the actor via `PipeTo`. On success, `BecomeConnected` is called; on failure, the counter increments and the reconnect timer is re-armed. ### Transparent re-subscribe `ReSubscribeAll` runs immediately after a successful reconnect, before `BecomeConnected`. It derives the tag list from `_subscriptionsByInstance` (the durable record of what every Instance Actor asked for) rather than from `_subscriptionIds` (which are adapter handles cleared on every disconnect). All in-flight and unresolved sets are cleared so the new adapter starts clean. Subscriptions are re-issued in parallel via `PipeTo`, so the actor is never blocked during re-subscribe. ```csharp // DataConnectionActor.cs — HandleReconnectResult if (result.Success) { _consecutiveFailures = 0; ReSubscribeAll(); // tag subscriptions ReSubscribeAllAlarms(); // native alarm feeds BecomeConnected(); } ``` ### Failover between primary and backup endpoints When `BackupConnectionDetails` is provided, the actor tracks consecutive failures and unstable disconnects. After `FailoverRetryCount` consecutive failures (or unstable connections shorter than `StableConnectionThreshold`), the actor disposes the current adapter, creates a fresh one with the other endpoint's config via `DataConnectionFactory.Create`, and re-arms the connect timer. Failover is round-robin: primary → backup → primary. There is no auto-failback; the connection stays on whichever endpoint is currently working. ### Write path Writes are fire-and-forget from the script's view only in the sense that there is no store-and-forward. The `DataConnectionActor` pipes `_adapter.WriteAsync` back to the original sender: ```csharp // DataConnectionActor.cs — HandleWrite private void HandleWrite(WriteTagRequest request) { var sender = Sender; var cts = new CancellationTokenSource(_options.WriteTimeout); _adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t => { cts.Dispose(); if (t.IsCompletedSuccessfully) return new WriteTagResponse(request.CorrelationId, t.Result.Success, t.Result.ErrorMessage, DateTimeOffset.UtcNow); if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException) return new WriteTagResponse(request.CorrelationId, false, $"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow); return new WriteTagResponse(request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); }).PipeTo(sender); } ``` `WriteTagResponse` is returned synchronously to the calling script (via the Instance Actor, which awaits it). There is no store-and-forward for writes — buffering stale setpoints for later replay is unsafe in a control context. ### Tag path resolution retry When `_adapter.SubscribeAsync` throws a resolution-level exception (node not found, device still booting), the tag is added to `_unresolvedTags` and marked `QualityCode.Bad`. A periodic `RetryTagResolution` timer fires at `TagResolutionRetryInterval`. Only tags that are not already in `_resolutionInFlight` are dispatched on each tick, preventing duplicate concurrent subscribe calls for a slow device. When resolution succeeds, the tag moves from `_unresolvedTags` to `_subscriptionIds`, `_resolvedTags` is incremented, and the timer is cancelled once the set is empty. A separate in-flight set `_subscribesInFlight` tracks the initial `SubscribeAsync` for newly requested tags. Two `SubscribeTagsRequest` messages arriving for different instances that share a tag path both observe the tag as already handled, so only one adapter subscribe is issued. ### Tag subscriber reference counting `_tagSubscriberCount` maps each tag path to the number of instances subscribed to it. When an instance unsubscribes, the count is decremented. The adapter `UnsubscribeAsync` call and the quality / resolution counters are updated only when the count reaches zero. This means a shared tag path (subscribed by two different instances bound to the same connection) remains active at the adapter until both instances stop. ## Usage ### Creating a connection The Site Runtime's `DeploymentManagerActor` sends `CreateConnectionCommand` to `DataConnectionManagerActor` during instance deployment: ```csharp // Commons/Messages/DataConnection/CreateConnectionCommand.cs public record CreateConnectionCommand( string ConnectionName, string ProtocolType, IDictionary PrimaryConnectionDetails, IDictionary? BackupConnectionDetails = null, int FailoverRetryCount = 3); ``` `DataConnectionManagerActor.HandleCreateConnection` calls `DataConnectionFactory.Create(ProtocolType, PrimaryConnectionDetails)` to produce the initial `IDataConnection` adapter and spawns a `DataConnectionActor` child. ### Subscribing and receiving values Instance Actors send `SubscribeTagsRequest` and receive `TagValueUpdate` messages. The actor sends `TagValueUpdate` to the Instance Actor's ref for every value change notification from the adapter. On disconnect, `ConnectionQualityChanged` (with `QualityCode.Bad`) is sent to all subscribers so Instance Actors reflect staleness immediately without waiting for a per-tag callback. ### Browse (design-time only) `BrowseNodeCommand` is routed to the named `DataConnectionActor`. If the adapter implements `IBrowsableDataConnection`, the actor calls `BrowseChildrenAsync` and pipes `BrowseNodeResult` back. The result is capped to ~100 KB before reply to stay within the Akka remote frame budget on the site→central crossing. Browse is never called on the hot path and Instance Actors never use it. ### Native alarm subscription `NativeAlarmActor` (Site Runtime) sends `SubscribeAlarmsRequest` to the `DataConnectionManagerActor`, which forwards it to the named `DataConnectionActor`: ```csharp // DataConnectionActor.cs — HandleSubscribeAlarms (key excerpt) if (_adapter is not IAlarmSubscribableConnection alarmable) { subscriber.Tell(new SubscribeAlarmsResponse( request.CorrelationId, request.InstanceUniqueName, false, $"Connection '{_connectionName}' is not alarm-capable.", now)); return; } // register the subscriber for routing before issuing the adapter call _alarmSourceSubscribers[request.SourceReference].Add(subscriber); // open one feed per source reference; subsequent subscribers reuse it alarmable.SubscribeAlarmsAsync(sourceRef, filter, t => self.Tell(new AlarmTransitionReceived(t, generation))) .ContinueWith(task => ...) .PipeTo(self); ``` Incoming `AlarmTransitionReceived` is routed to all subscribers whose registered `SourceReference` is a prefix of the transition's `SourceObjectReference` or `SourceReference`. On disconnect, `PushAlarmSourceUnavailable` sends `NativeAlarmSourceUnavailable` to every alarm subscriber; on reconnect, `ReSubscribeAllAlarms` re-opens the feeds and the adapter replays a snapshot of currently-active conditions. ## Configuration All settings under `DataConnectionLayer` in `appsettings.json` bind to `DataConnectionOptions`. Global protocol settings (`OpcUa`, `MxGateway` sections) bind to `OpcUaGlobalOptions` and `MxGatewayGlobalOptions`. ### Shared settings (`DataConnectionLayer` section) | Key | Default | Description | |-----|---------|-------------| | `ReconnectInterval` | `00:00:05` | Fixed interval between reconnection attempts after a disconnect or failed connect. | | `TagResolutionRetryInterval` | `00:00:10` | Retry interval for tag paths that could not be resolved on the device. | | `WriteTimeout` | `00:00:30` | Timeout applied to each `WriteAsync` call. A hung write times out and returns an error to the calling script. | | `StableConnectionThreshold` | `00:01:00` | A connection that drops before this duration is counted as an unstable disconnect toward `FailoverRetryCount`. | ### OPC UA global settings (`OpcUa` section) | Key | Default | Description | |-----|---------|-------------| | `ApplicationName` | `ScadaBridge-DCL` | Application name used in the OPC UA certificate and session negotiation. | | `TrustedIssuerStorePath` | `""` | File path to the trusted issuer certificate store. | | `TrustedPeerStorePath` | `""` | File path to the trusted peer certificate store. | | `RejectedCertificateStorePath` | `""` | File path to the rejected certificate store. | Empty store paths fall back to a temp-path default so dev runs work without explicit configuration. ### Per-connection settings (OPC UA) Stored in the `DataConnection.PrimaryConfiguration` JSON dict, parsed by `OpcUaEndpointConfigSerializer.FromFlatDict`. | Key | Default | Description | |-----|---------|-------------| | `endpoint` / `EndpointUrl` | `opc.tcp://localhost:4840` | OPC UA server endpoint URL. | | `SessionTimeoutMs` | `60000` | Session timeout in ms. | | `OperationTimeoutMs` | `15000` | Transport operation timeout in ms. | | `PublishingIntervalMs` | `1000` | Subscription publishing interval in ms. | | `KeepAliveCount` | `10` | Keep-alive frames before session timeout. | | `LifetimeCount` | `30` | Subscription lifetime in publish intervals. | | `SamplingIntervalMs` | `1000` | Per-item server sampling rate in ms. | | `QueueSize` | `10` | Per-item notification buffer size. | | `SecurityMode` | `None` | `None`, `Sign`, or `SignAndEncrypt`. | | `AutoAcceptUntrustedCerts` | `true` | Accept untrusted server certificates. | ### Per-connection settings (MxGateway) Stored in `DataConnection.PrimaryConfiguration`, parsed by `MxGatewayEndpointConfigSerializer.FromFlatDict`. | Key | Default | Description | |-----|---------|-------------| | `Endpoint` | `http://localhost:5000` | Gateway base URL. | | `ApiKey` | — | Sent as `Authorization: Bearer `. Redacted in logs. | | `ClientName` | `scadabridge` | MXAccess client registration name. | | `WriteUserId` | `0` | MXAccess user id on writes. `0` = no user context. | | `UseTls` | `false` | Use TLS. | | `CaFile` | — | CA certificate file path (TLS only). | | `ServerName` | — | TLS server-name override. | | `ReadTimeoutMs` | `5000` | `ReadBulk` per-call timeout in ms. | ### Per-connection failover settings Carried on `CreateConnectionCommand` from the deployment artifact. | Field | Default | Description | |-------|---------|-------------| | `BackupConnectionDetails` | `null` | If absent, the connection retries its single endpoint indefinitely. | | `FailoverRetryCount` | `3` | Consecutive failures (or unstable disconnects) before switching endpoints. | ## Dependencies & Interactions - [Site Runtime (#3)](./SiteRuntime.md) — `DeploymentManagerActor` sends `CreateConnectionCommand` / `RemoveConnectionCommand` to create and tear down connections during instance deployment. Instance Actors send `SubscribeTagsRequest` / `UnsubscribeTagsRequest` / `WriteTagRequest`. `NativeAlarmActor` (peer to `AlarmActor` under `InstanceActor`) sends `SubscribeAlarmsRequest` / `UnsubscribeAlarmsRequest` and receives `NativeAlarmTransitionUpdate` / `NativeAlarmSourceUnavailable`. - [Commons (#16)](./Commons.md) — owns `IDataConnection`, `IAlarmSubscribableConnection`, `IBrowsableDataConnection` interfaces; `TagValue`, `QualityCode`, `NativeAlarmTransition`, `AlarmConditionState`, `AlarmTransitionKind` types; and all message contracts (`SubscribeTagsRequest/Response`, `TagValueUpdate`, `WriteTagRequest/Response`, `SubscribeAlarmsRequest/Response`, `NativeAlarmTransitionUpdate`, `NativeAlarmSourceUnavailable`, `CreateConnectionCommand`, `BrowseNodeCommand/Result`, `DataConnectionHealthReport`). - [Health Monitoring (#11)](./HealthMonitoring.md) — `DataConnectionActor` calls `ISiteHealthCollector.UpdateConnectionHealth`, `UpdateTagResolution`, `UpdateTagQuality`, `UpdateConnectionEndpoint`, and `RemoveConnection` to keep the site health report current. `DataConnectionManagerActor` handles `GetAllHealthReports` by forwarding `GetHealthReport` to each child. - [Site Event Logging (#12)](./SiteEventLogging.md) — `DataConnectionActor` logs connection lost, restored, and failover events via `ISiteEventLogger.LogEventAsync` (fire-and-forget). Absent in tests where `ISiteEventLogger` is null. - Design spec: [Component-DataConnectionLayer.md](../requirements/Component-DataConnectionLayer.md). ## Troubleshooting ### Tags stuck in bad quality after reconnect After a reconnect, `ReSubscribeAll` re-issues all subscriptions. Tags that resolve successfully immediately receive a seeded value from `ReadAsync` before the subscription's first change notification arrives, so the Instance Actor sees a good-quality value promptly. Tags that fail resolution land in `_unresolvedTags` and are retried at `TagResolutionRetryInterval`. If a tag is persistently bad, check whether the tag path exists on the device and whether the device is online. The health report's `ResolvedTags` vs `TotalSubscribedTags` counters expose the gap without requiring the debug view. ### Connection not failing over to backup The failover counter uses two independent paths: `_consecutiveFailures` for connect-attempt failures, and `_consecutiveUnstableDisconnects` for connections that drop before `StableConnectionThreshold`. Both must reach `FailoverRetryCount` to trigger a switch. A connection that succeeds but immediately drops does not increment `_consecutiveFailures` — it increments `_consecutiveUnstableDisconnects` instead. Verify both counters are relevant to the observed failure pattern. Failover events are written to Site Event Logging as `Warning` entries. ### Browse hangs on "loading…" The `DataConnectionActor` caps `BrowseNodeResult` to ~100 KB before returning it. If the picker hangs rather than showing a "results truncated" hint, the reply may have been silently discarded by Akka remoting before reaching central (the reply crosses the site→central frame). Check whether the connection actor is in `Connecting` or `Reconnecting` state — browse while disconnected returns `BrowseFailureKind.ConnectionNotConnected` immediately rather than hanging. ### Alarm feed not receiving transitions Confirm that the connection's adapter implements `IAlarmSubscribableConnection` (currently `OpcUaDataConnection` and `MxGatewayDataConnection`). A non-capable adapter returns `SubscribeAlarmsResponse(Success = false)`. If the capability is present, check whether the connection is in `Connected` state — `SubscribeAlarmsRequest` is stashed while `Connecting` or `Reconnecting` and processed on entering `Connected`. On reconnect, `ReSubscribeAllAlarms` re-opens the feeds and the adapter replays a snapshot. ## Related Documentation - [Data Connection Layer design specification](../requirements/Component-DataConnectionLayer.md) - [Site Runtime](./SiteRuntime.md) - [Commons](./Commons.md) - [Health Monitoring](./HealthMonitoring.md) - [Site Event Logging](./SiteEventLogging.md) - [Central–Site Communication](./Communication.md)