TemplateEngine (alarm-script-ref ordering, native-alarm-sources not in revision hash, composition cycle checks, 9-step pipeline), SiteRuntime (alarm on-trigger scripts run with a restricted context; PreStart seeds children from defaults before overrides arrive), DataConnectionLayer (UnsubscribeAlarmsRequest stashed in Connecting), StoreAndForward (InFlight/ Delivered are dead enum values; notifications can park at 50 retries), ExternalSystemGateway (CachedWrite returns void + enqueues directly; log levels).
21 KiB
Data Connection Layer
The Data Connection Layer is the site-only clean data pipe that abstracts protocol-specific communication behind a uniform actor/interface model, delivering live tag values and native alarm transitions to Instance Actors while hiding all connection lifecycle complexity from the rest of the system.
Overview
The Data Connection Layer (#4) runs exclusively on site nodes. Central never reads from or writes to physical devices directly. The component code lives in src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/, with adapters in Adapters/ and the actor hierarchy in Actors/.
Key collaborators are:
DataConnectionManagerActor— the site-level router. One actor per site, child of the Site Runtime hierarchy. It owns theconnectionName → IActorRefmap and routes every inbound message to the rightDataConnectionActor.DataConnectionActor— one per configured data connection. Owns a singleIDataConnectionadapter and models the full connection lifecycle as a Become/Stash state machine.IDataConnection(Commons) — the protocol adapter contract. Implemented byOpcUaDataConnectionandMxGatewayDataConnection.DataConnectionFactory/IDataConnectionFactory— resolves a case-insensitiveProtocolTypestring to a freshIDataConnectioninstance.
DI registration is through ServiceCollectionExtensions.AddDataConnectionLayer, which binds DataConnectionOptions from the DataConnectionLayer section and OpcUaGlobalOptions from the OpcUa section. The actors themselves are created inside the ActorSystem, not by DI.
Key Concepts
Clean data pipe
The DCL performs no alarm evaluation, no trigger evaluation, and no business logic. Its only job is to subscribe to tag paths requested by Instance Actors, deliver TagValueUpdate messages when values change, and route write requests down to the device. Every value change crosses exactly one boundary: device → adapter callback → DataConnectionActor → InstanceActor. The DCL never publishes to any actor other than the Instance Actors that subscribed.
Become/Stash lifecycle state machine
DataConnectionActor uses Akka.NET's Become/Stash pattern to model three states cleanly:
| State | Behaviour |
|---|---|
Connecting |
Stashes SubscribeTagsRequest, WriteTagRequest, UnsubscribeTagsRequest, SubscribeAlarmsRequest, UnsubscribeAlarmsRequest. Non-blocking for ReadTagValuesCommand (immediate synchronous failure reply) and BrowseNodeCommand (async failure via PipeTo when the adapter is IBrowsableDataConnection; immediate synchronous reply otherwise). |
Connected |
Processes all message types. On entering, calls Stash.UnstashAll(). |
Reconnecting |
Stashes new subscribe/write requests; allows UnsubscribeTagsRequest and UnsubscribeAlarmsRequest through for cleanup on instance stop. |
BecomeConnecting fires immediately in PreStart. BecomeConnected also calls Stash.UnstashAll() so queued subscribe requests from instance startup race are processed without any loss. The IWithStash and IWithTimers interfaces are declared on the actor; both are injected by the Akka.NET infrastructure.
Adapter generation guard
Each DataConnectionActor holds an _adapterGeneration integer. When a new IDataConnection is created on failover, the generation is incremented. All subscription callbacks capture the generation at the time they are registered; a TagValueReceived or AlarmTransitionReceived whose generation does not match _adapterGeneration is silently dropped. This ensures stale callbacks from a disposed OPC UA SDK session or a closed gRPC stream never reach Instance Actors.
Protocol extensibility
Adding a new protocol requires implementing IDataConnection (and optionally IBrowsableDataConnection and/or IAlarmSubscribableConnection) and calling DataConnectionFactory.RegisterAdapter with the new ProtocolType string. The manager and the actor are protocol-agnostic — they always work through the interfaces.
Architecture
Actor hierarchy
DataConnectionManagerActor (one per site; child of DeploymentManagerActor)
├── DataConnectionActor "OpcSrv1" (one per configured connection)
├── DataConnectionActor "Gateway2"
└── ...
DataConnectionManagerActor is a ReceiveActor. It routes by connection name: HandleRoute calls actor.Forward(request) when the connection exists or sends a typed failure reply when it does not. The manager owns ConnectionNotFound failures; DataConnectionActor owns everything else (not connected, server errors, capability checks). The manager's SupervisorStrategy is Resume with up to ten restarts per minute, so a transient exception on a child never collapses the routing table.
CreateConnectionCommand (from the Site Runtime's DeploymentManagerActor) triggers HandleCreateConnection. The actor name is sanitised from the connection's display name to satisfy Akka's path character constraints.
Connection state machine
// DataConnectionActor.cs — PreStart and state transitions
protected override void PreStart()
{
_self = Self; // capture for off-thread use
_adapter.Disconnected += OnAdapterDisconnected;
BecomeConnecting();
}
private void OnAdapterDisconnected()
{
// fired from a background thread (gRPC stream, OPC UA keep-alive timer)
_self.Tell(new AdapterDisconnected()); // marshal onto actor loop
}
private void BecomeConnecting()
{
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
Become(Connecting);
Self.Tell(new AttemptConnect());
}
private void BecomeConnected()
{
_lastConnectedAt = DateTimeOffset.UtcNow;
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
Become(Connected);
Stash.UnstashAll();
}
private void BecomeReconnecting()
{
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
Become(Reconnecting);
PushBadQualityForAllTags(); // immediate bad quality — Instance Actors see it now
PushAlarmSourceUnavailable(); // native alarm subscribers notified
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
AttemptConnect fires _adapter.ConnectAsync(...) and pipes ConnectResult back to the actor via PipeTo. On success, BecomeConnected is called; on failure, the counter increments and the reconnect timer is re-armed.
Transparent re-subscribe
ReSubscribeAll runs immediately after a successful reconnect, before BecomeConnected. It derives the tag list from _subscriptionsByInstance (the durable record of what every Instance Actor asked for) rather than from _subscriptionIds (which are adapter handles cleared on every disconnect). All in-flight and unresolved sets are cleared so the new adapter starts clean. Subscriptions are re-issued in parallel via PipeTo, so the actor is never blocked during re-subscribe.
// DataConnectionActor.cs — HandleReconnectResult
if (result.Success)
{
_consecutiveFailures = 0;
ReSubscribeAll(); // tag subscriptions
ReSubscribeAllAlarms(); // native alarm feeds
BecomeConnected();
}
Failover between primary and backup endpoints
When BackupConnectionDetails is provided, the actor tracks consecutive failures and unstable disconnects. After FailoverRetryCount consecutive failures (or unstable connections shorter than StableConnectionThreshold), the actor disposes the current adapter, creates a fresh one with the other endpoint's config via DataConnectionFactory.Create, and re-arms the connect timer. Failover is round-robin: primary → backup → primary. There is no auto-failback; the connection stays on whichever endpoint is currently working.
Write path
Writes are fire-and-forget from the script's view only in the sense that there is no store-and-forward. The DataConnectionActor pipes _adapter.WriteAsync back to the original sender:
// DataConnectionActor.cs — HandleWrite
private void HandleWrite(WriteTagRequest request)
{
var sender = Sender;
var cts = new CancellationTokenSource(_options.WriteTimeout);
_adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
{
cts.Dispose();
if (t.IsCompletedSuccessfully)
return new WriteTagResponse(request.CorrelationId, t.Result.Success,
t.Result.ErrorMessage, DateTimeOffset.UtcNow);
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
return new WriteTagResponse(request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
return new WriteTagResponse(request.CorrelationId, false,
t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
WriteTagResponse is returned synchronously to the calling script (via the Instance Actor, which awaits it). There is no store-and-forward for writes — buffering stale setpoints for later replay is unsafe in a control context.
Tag path resolution retry
When _adapter.SubscribeAsync throws a resolution-level exception (node not found, device still booting), the tag is added to _unresolvedTags and marked QualityCode.Bad. A periodic RetryTagResolution timer fires at TagResolutionRetryInterval. Only tags that are not already in _resolutionInFlight are dispatched on each tick, preventing duplicate concurrent subscribe calls for a slow device. When resolution succeeds, the tag moves from _unresolvedTags to _subscriptionIds, _resolvedTags is incremented, and the timer is cancelled once the set is empty.
A separate in-flight set _subscribesInFlight tracks the initial SubscribeAsync for newly requested tags. Two SubscribeTagsRequest messages arriving for different instances that share a tag path both observe the tag as already handled, so only one adapter subscribe is issued.
Tag subscriber reference counting
_tagSubscriberCount maps each tag path to the number of instances subscribed to it. When an instance unsubscribes, the count is decremented. The adapter UnsubscribeAsync call and the quality / resolution counters are updated only when the count reaches zero. This means a shared tag path (subscribed by two different instances bound to the same connection) remains active at the adapter until both instances stop.
Usage
Creating a connection
The Site Runtime's DeploymentManagerActor sends CreateConnectionCommand to DataConnectionManagerActor during instance deployment:
// Commons/Messages/DataConnection/CreateConnectionCommand.cs
public record CreateConnectionCommand(
string ConnectionName,
string ProtocolType,
IDictionary<string, string> PrimaryConnectionDetails,
IDictionary<string, string>? BackupConnectionDetails = null,
int FailoverRetryCount = 3);
DataConnectionManagerActor.HandleCreateConnection calls DataConnectionFactory.Create(ProtocolType, PrimaryConnectionDetails) to produce the initial IDataConnection adapter and spawns a DataConnectionActor child.
Subscribing and receiving values
Instance Actors send SubscribeTagsRequest and receive TagValueUpdate messages. The actor sends TagValueUpdate to the Instance Actor's ref for every value change notification from the adapter. On disconnect, ConnectionQualityChanged (with QualityCode.Bad) is sent to all subscribers so Instance Actors reflect staleness immediately without waiting for a per-tag callback.
Browse (design-time only)
BrowseNodeCommand is routed to the named DataConnectionActor. If the adapter implements IBrowsableDataConnection, the actor calls BrowseChildrenAsync and pipes BrowseNodeResult back. The result is capped to ~100 KB before reply to stay within the Akka remote frame budget on the site→central crossing. Browse is never called on the hot path and Instance Actors never use it.
Native alarm subscription
NativeAlarmActor (Site Runtime) sends SubscribeAlarmsRequest to the DataConnectionManagerActor, which forwards it to the named DataConnectionActor:
// DataConnectionActor.cs — HandleSubscribeAlarms (key excerpt)
if (_adapter is not IAlarmSubscribableConnection alarmable)
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, false,
$"Connection '{_connectionName}' is not alarm-capable.", now));
return;
}
// register the subscriber for routing before issuing the adapter call
_alarmSourceSubscribers[request.SourceReference].Add(subscriber);
// open one feed per source reference; subsequent subscribers reuse it
alarmable.SubscribeAlarmsAsync(sourceRef, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => ...)
.PipeTo(self);
Incoming AlarmTransitionReceived is routed to all subscribers whose registered SourceReference is a prefix of the transition's SourceObjectReference or SourceReference. On disconnect, PushAlarmSourceUnavailable sends NativeAlarmSourceUnavailable to every alarm subscriber; on reconnect, ReSubscribeAllAlarms re-opens the feeds and the adapter replays a snapshot of currently-active conditions.
Configuration
All settings under DataConnectionLayer in appsettings.json bind to DataConnectionOptions. Global protocol settings (OpcUa, MxGateway sections) bind to OpcUaGlobalOptions and MxGatewayGlobalOptions.
Shared settings (DataConnectionLayer section)
| Key | Default | Description |
|---|---|---|
ReconnectInterval |
00:00:05 |
Fixed interval between reconnection attempts after a disconnect or failed connect. |
TagResolutionRetryInterval |
00:00:10 |
Retry interval for tag paths that could not be resolved on the device. |
WriteTimeout |
00:00:30 |
Timeout applied to each WriteAsync call. A hung write times out and returns an error to the calling script. |
StableConnectionThreshold |
00:01:00 |
A connection that drops before this duration is counted as an unstable disconnect toward FailoverRetryCount. |
OPC UA global settings (OpcUa section)
| Key | Default | Description |
|---|---|---|
ApplicationName |
ScadaBridge-DCL |
Application name used in the OPC UA certificate and session negotiation. |
TrustedIssuerStorePath |
"" |
File path to the trusted issuer certificate store. |
TrustedPeerStorePath |
"" |
File path to the trusted peer certificate store. |
RejectedCertificateStorePath |
"" |
File path to the rejected certificate store. |
Empty store paths fall back to a temp-path default so dev runs work without explicit configuration.
Per-connection settings (OPC UA)
Stored in the DataConnection.PrimaryConfiguration JSON dict, parsed by OpcUaEndpointConfigSerializer.FromFlatDict.
| Key | Default | Description |
|---|---|---|
endpoint / EndpointUrl |
opc.tcp://localhost:4840 |
OPC UA server endpoint URL. |
SessionTimeoutMs |
60000 |
Session timeout in ms. |
OperationTimeoutMs |
15000 |
Transport operation timeout in ms. |
PublishingIntervalMs |
1000 |
Subscription publishing interval in ms. |
KeepAliveCount |
10 |
Keep-alive frames before session timeout. |
LifetimeCount |
30 |
Subscription lifetime in publish intervals. |
SamplingIntervalMs |
1000 |
Per-item server sampling rate in ms. |
QueueSize |
10 |
Per-item notification buffer size. |
SecurityMode |
None |
None, Sign, or SignAndEncrypt. |
AutoAcceptUntrustedCerts |
true |
Accept untrusted server certificates. |
Per-connection settings (MxGateway)
Stored in DataConnection.PrimaryConfiguration, parsed by MxGatewayEndpointConfigSerializer.FromFlatDict.
| Key | Default | Description |
|---|---|---|
Endpoint |
http://localhost:5000 |
Gateway base URL. |
ApiKey |
— | Sent as Authorization: Bearer <key>. Redacted in logs. |
ClientName |
scadabridge |
MXAccess client registration name. |
WriteUserId |
0 |
MXAccess user id on writes. 0 = no user context. |
UseTls |
false |
Use TLS. |
CaFile |
— | CA certificate file path (TLS only). |
ServerName |
— | TLS server-name override. |
ReadTimeoutMs |
5000 |
ReadBulk per-call timeout in ms. |
Per-connection failover settings
Carried on CreateConnectionCommand from the deployment artifact.
| Field | Default | Description |
|---|---|---|
BackupConnectionDetails |
null |
If absent, the connection retries its single endpoint indefinitely. |
FailoverRetryCount |
3 |
Consecutive failures (or unstable disconnects) before switching endpoints. |
Dependencies & Interactions
- Site Runtime (#3) —
DeploymentManagerActorsendsCreateConnectionCommand/RemoveConnectionCommandto create and tear down connections during instance deployment. Instance Actors sendSubscribeTagsRequest/UnsubscribeTagsRequest/WriteTagRequest.NativeAlarmActor(peer toAlarmActorunderInstanceActor) sendsSubscribeAlarmsRequest/UnsubscribeAlarmsRequestand receivesNativeAlarmTransitionUpdate/NativeAlarmSourceUnavailable. - Commons (#16) — owns
IDataConnection,IAlarmSubscribableConnection,IBrowsableDataConnectioninterfaces;TagValue,QualityCode,NativeAlarmTransition,AlarmConditionState,AlarmTransitionKindtypes; and all message contracts (SubscribeTagsRequest/Response,TagValueUpdate,WriteTagRequest/Response,SubscribeAlarmsRequest/Response,NativeAlarmTransitionUpdate,NativeAlarmSourceUnavailable,CreateConnectionCommand,BrowseNodeCommand/Result,DataConnectionHealthReport). - Health Monitoring (#11) —
DataConnectionActorcallsISiteHealthCollector.UpdateConnectionHealth,UpdateTagResolution,UpdateTagQuality,UpdateConnectionEndpoint, andRemoveConnectionto keep the site health report current.DataConnectionManagerActorhandlesGetAllHealthReportsby forwardingGetHealthReportto each child. - Site Event Logging (#12) —
DataConnectionActorlogs connection lost, restored, and failover events viaISiteEventLogger.LogEventAsync(fire-and-forget). Absent in tests whereISiteEventLoggeris null. - Design spec: Component-DataConnectionLayer.md.
Troubleshooting
Tags stuck in bad quality after reconnect
After a reconnect, ReSubscribeAll re-issues all subscriptions. Tags that resolve successfully immediately receive a seeded value from ReadAsync before the subscription's first change notification arrives, so the Instance Actor sees a good-quality value promptly. Tags that fail resolution land in _unresolvedTags and are retried at TagResolutionRetryInterval. If a tag is persistently bad, check whether the tag path exists on the device and whether the device is online. The health report's ResolvedTags vs TotalSubscribedTags counters expose the gap without requiring the debug view.
Connection not failing over to backup
The failover counter uses two independent paths: _consecutiveFailures for connect-attempt failures, and _consecutiveUnstableDisconnects for connections that drop before StableConnectionThreshold. Both must reach FailoverRetryCount to trigger a switch. A connection that succeeds but immediately drops does not increment _consecutiveFailures — it increments _consecutiveUnstableDisconnects instead. Verify both counters are relevant to the observed failure pattern. Failover events are written to Site Event Logging as Warning entries.
Browse hangs on "loading…"
The DataConnectionActor caps BrowseNodeResult to ~100 KB before returning it. If the picker hangs rather than showing a "results truncated" hint, the reply may have been silently discarded by Akka remoting before reaching central (the reply crosses the site→central frame). Check whether the connection actor is in Connecting or Reconnecting state — browse while disconnected returns BrowseFailureKind.ConnectionNotConnected immediately rather than hanging.
Alarm feed not receiving transitions
Confirm that the connection's adapter implements IAlarmSubscribableConnection (currently OpcUaDataConnection and MxGatewayDataConnection). A non-capable adapter returns SubscribeAlarmsResponse(Success = false). If the capability is present, check whether the connection is in Connected state — SubscribeAlarmsRequest is stashed while Connecting or Reconnecting and processed on entering Connected. On reconnect, ReSubscribeAllAlarms re-opens the feeds and the adapter replays a snapshot.