Files
Joseph Doherty 25bae4e43b docs(components): accuracy fixes from deep review (batch 2)
TemplateEngine (alarm-script-ref ordering, native-alarm-sources not in
revision hash, composition cycle checks, 9-step pipeline), SiteRuntime
(alarm on-trigger scripts run with a restricted context; PreStart seeds
children from defaults before overrides arrive), DataConnectionLayer
(UnsubscribeAlarmsRequest stashed in Connecting), StoreAndForward (InFlight/
Delivered are dead enum values; notifications can park at 50 retries),
ExternalSystemGateway (CachedWrite returns void + enqueues directly; log levels).
2026-06-03 16:34:37 -04:00

21 KiB
Raw Permalink Blame History

Data Connection Layer

The Data Connection Layer is the site-only clean data pipe that abstracts protocol-specific communication behind a uniform actor/interface model, delivering live tag values and native alarm transitions to Instance Actors while hiding all connection lifecycle complexity from the rest of the system.

Overview

The Data Connection Layer (#4) runs exclusively on site nodes. Central never reads from or writes to physical devices directly. The component code lives in src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/, with adapters in Adapters/ and the actor hierarchy in Actors/.

Key collaborators are:

  • DataConnectionManagerActor — the site-level router. One actor per site, child of the Site Runtime hierarchy. It owns the connectionName → IActorRef map and routes every inbound message to the right DataConnectionActor.
  • DataConnectionActor — one per configured data connection. Owns a single IDataConnection adapter and models the full connection lifecycle as a Become/Stash state machine.
  • IDataConnection (Commons) — the protocol adapter contract. Implemented by OpcUaDataConnection and MxGatewayDataConnection.
  • DataConnectionFactory / IDataConnectionFactory — resolves a case-insensitive ProtocolType string to a fresh IDataConnection instance.

DI registration is through ServiceCollectionExtensions.AddDataConnectionLayer, which binds DataConnectionOptions from the DataConnectionLayer section and OpcUaGlobalOptions from the OpcUa section. The actors themselves are created inside the ActorSystem, not by DI.

Key Concepts

Clean data pipe

The DCL performs no alarm evaluation, no trigger evaluation, and no business logic. Its only job is to subscribe to tag paths requested by Instance Actors, deliver TagValueUpdate messages when values change, and route write requests down to the device. Every value change crosses exactly one boundary: device → adapter callback → DataConnectionActorInstanceActor. The DCL never publishes to any actor other than the Instance Actors that subscribed.

Become/Stash lifecycle state machine

DataConnectionActor uses Akka.NET's Become/Stash pattern to model three states cleanly:

State Behaviour
Connecting Stashes SubscribeTagsRequest, WriteTagRequest, UnsubscribeTagsRequest, SubscribeAlarmsRequest, UnsubscribeAlarmsRequest. Non-blocking for ReadTagValuesCommand (immediate synchronous failure reply) and BrowseNodeCommand (async failure via PipeTo when the adapter is IBrowsableDataConnection; immediate synchronous reply otherwise).
Connected Processes all message types. On entering, calls Stash.UnstashAll().
Reconnecting Stashes new subscribe/write requests; allows UnsubscribeTagsRequest and UnsubscribeAlarmsRequest through for cleanup on instance stop.

BecomeConnecting fires immediately in PreStart. BecomeConnected also calls Stash.UnstashAll() so queued subscribe requests from instance startup race are processed without any loss. The IWithStash and IWithTimers interfaces are declared on the actor; both are injected by the Akka.NET infrastructure.

Adapter generation guard

Each DataConnectionActor holds an _adapterGeneration integer. When a new IDataConnection is created on failover, the generation is incremented. All subscription callbacks capture the generation at the time they are registered; a TagValueReceived or AlarmTransitionReceived whose generation does not match _adapterGeneration is silently dropped. This ensures stale callbacks from a disposed OPC UA SDK session or a closed gRPC stream never reach Instance Actors.

Protocol extensibility

Adding a new protocol requires implementing IDataConnection (and optionally IBrowsableDataConnection and/or IAlarmSubscribableConnection) and calling DataConnectionFactory.RegisterAdapter with the new ProtocolType string. The manager and the actor are protocol-agnostic — they always work through the interfaces.

Architecture

Actor hierarchy

DataConnectionManagerActor          (one per site; child of DeploymentManagerActor)
├── DataConnectionActor  "OpcSrv1"  (one per configured connection)
├── DataConnectionActor  "Gateway2"
└── ...

DataConnectionManagerActor is a ReceiveActor. It routes by connection name: HandleRoute calls actor.Forward(request) when the connection exists or sends a typed failure reply when it does not. The manager owns ConnectionNotFound failures; DataConnectionActor owns everything else (not connected, server errors, capability checks). The manager's SupervisorStrategy is Resume with up to ten restarts per minute, so a transient exception on a child never collapses the routing table.

CreateConnectionCommand (from the Site Runtime's DeploymentManagerActor) triggers HandleCreateConnection. The actor name is sanitised from the connection's display name to satisfy Akka's path character constraints.

Connection state machine

// DataConnectionActor.cs — PreStart and state transitions
protected override void PreStart()
{
    _self = Self;                                  // capture for off-thread use
    _adapter.Disconnected += OnAdapterDisconnected;
    BecomeConnecting();
}

private void OnAdapterDisconnected()
{
    // fired from a background thread (gRPC stream, OPC UA keep-alive timer)
    _self.Tell(new AdapterDisconnected());         // marshal onto actor loop
}

private void BecomeConnecting()
{
    _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
    Become(Connecting);
    Self.Tell(new AttemptConnect());
}

private void BecomeConnected()
{
    _lastConnectedAt = DateTimeOffset.UtcNow;
    _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
    Become(Connected);
    Stash.UnstashAll();
}

private void BecomeReconnecting()
{
    _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
    Become(Reconnecting);
    PushBadQualityForAllTags();      // immediate bad quality — Instance Actors see it now
    PushAlarmSourceUnavailable();    // native alarm subscribers notified
    Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}

AttemptConnect fires _adapter.ConnectAsync(...) and pipes ConnectResult back to the actor via PipeTo. On success, BecomeConnected is called; on failure, the counter increments and the reconnect timer is re-armed.

Transparent re-subscribe

ReSubscribeAll runs immediately after a successful reconnect, before BecomeConnected. It derives the tag list from _subscriptionsByInstance (the durable record of what every Instance Actor asked for) rather than from _subscriptionIds (which are adapter handles cleared on every disconnect). All in-flight and unresolved sets are cleared so the new adapter starts clean. Subscriptions are re-issued in parallel via PipeTo, so the actor is never blocked during re-subscribe.

// DataConnectionActor.cs — HandleReconnectResult
if (result.Success)
{
    _consecutiveFailures = 0;
    ReSubscribeAll();        // tag subscriptions
    ReSubscribeAllAlarms();  // native alarm feeds
    BecomeConnected();
}

Failover between primary and backup endpoints

When BackupConnectionDetails is provided, the actor tracks consecutive failures and unstable disconnects. After FailoverRetryCount consecutive failures (or unstable connections shorter than StableConnectionThreshold), the actor disposes the current adapter, creates a fresh one with the other endpoint's config via DataConnectionFactory.Create, and re-arms the connect timer. Failover is round-robin: primary → backup → primary. There is no auto-failback; the connection stays on whichever endpoint is currently working.

Write path

Writes are fire-and-forget from the script's view only in the sense that there is no store-and-forward. The DataConnectionActor pipes _adapter.WriteAsync back to the original sender:

// DataConnectionActor.cs — HandleWrite
private void HandleWrite(WriteTagRequest request)
{
    var sender = Sender;
    var cts = new CancellationTokenSource(_options.WriteTimeout);

    _adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
    {
        cts.Dispose();
        if (t.IsCompletedSuccessfully)
            return new WriteTagResponse(request.CorrelationId, t.Result.Success,
                t.Result.ErrorMessage, DateTimeOffset.UtcNow);
        if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
            return new WriteTagResponse(request.CorrelationId, false,
                $"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
        return new WriteTagResponse(request.CorrelationId, false,
            t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
    }).PipeTo(sender);
}

WriteTagResponse is returned synchronously to the calling script (via the Instance Actor, which awaits it). There is no store-and-forward for writes — buffering stale setpoints for later replay is unsafe in a control context.

Tag path resolution retry

When _adapter.SubscribeAsync throws a resolution-level exception (node not found, device still booting), the tag is added to _unresolvedTags and marked QualityCode.Bad. A periodic RetryTagResolution timer fires at TagResolutionRetryInterval. Only tags that are not already in _resolutionInFlight are dispatched on each tick, preventing duplicate concurrent subscribe calls for a slow device. When resolution succeeds, the tag moves from _unresolvedTags to _subscriptionIds, _resolvedTags is incremented, and the timer is cancelled once the set is empty.

A separate in-flight set _subscribesInFlight tracks the initial SubscribeAsync for newly requested tags. Two SubscribeTagsRequest messages arriving for different instances that share a tag path both observe the tag as already handled, so only one adapter subscribe is issued.

Tag subscriber reference counting

_tagSubscriberCount maps each tag path to the number of instances subscribed to it. When an instance unsubscribes, the count is decremented. The adapter UnsubscribeAsync call and the quality / resolution counters are updated only when the count reaches zero. This means a shared tag path (subscribed by two different instances bound to the same connection) remains active at the adapter until both instances stop.

Usage

Creating a connection

The Site Runtime's DeploymentManagerActor sends CreateConnectionCommand to DataConnectionManagerActor during instance deployment:

// Commons/Messages/DataConnection/CreateConnectionCommand.cs
public record CreateConnectionCommand(
    string ConnectionName,
    string ProtocolType,
    IDictionary<string, string> PrimaryConnectionDetails,
    IDictionary<string, string>? BackupConnectionDetails = null,
    int FailoverRetryCount = 3);

DataConnectionManagerActor.HandleCreateConnection calls DataConnectionFactory.Create(ProtocolType, PrimaryConnectionDetails) to produce the initial IDataConnection adapter and spawns a DataConnectionActor child.

Subscribing and receiving values

Instance Actors send SubscribeTagsRequest and receive TagValueUpdate messages. The actor sends TagValueUpdate to the Instance Actor's ref for every value change notification from the adapter. On disconnect, ConnectionQualityChanged (with QualityCode.Bad) is sent to all subscribers so Instance Actors reflect staleness immediately without waiting for a per-tag callback.

Browse (design-time only)

BrowseNodeCommand is routed to the named DataConnectionActor. If the adapter implements IBrowsableDataConnection, the actor calls BrowseChildrenAsync and pipes BrowseNodeResult back. The result is capped to ~100 KB before reply to stay within the Akka remote frame budget on the site→central crossing. Browse is never called on the hot path and Instance Actors never use it.

Native alarm subscription

NativeAlarmActor (Site Runtime) sends SubscribeAlarmsRequest to the DataConnectionManagerActor, which forwards it to the named DataConnectionActor:

// DataConnectionActor.cs — HandleSubscribeAlarms (key excerpt)
if (_adapter is not IAlarmSubscribableConnection alarmable)
{
    subscriber.Tell(new SubscribeAlarmsResponse(
        request.CorrelationId, request.InstanceUniqueName, false,
        $"Connection '{_connectionName}' is not alarm-capable.", now));
    return;
}
// register the subscriber for routing before issuing the adapter call
_alarmSourceSubscribers[request.SourceReference].Add(subscriber);

// open one feed per source reference; subsequent subscribers reuse it
alarmable.SubscribeAlarmsAsync(sourceRef, filter,
        t => self.Tell(new AlarmTransitionReceived(t, generation)))
    .ContinueWith(task => ...)
    .PipeTo(self);

Incoming AlarmTransitionReceived is routed to all subscribers whose registered SourceReference is a prefix of the transition's SourceObjectReference or SourceReference. On disconnect, PushAlarmSourceUnavailable sends NativeAlarmSourceUnavailable to every alarm subscriber; on reconnect, ReSubscribeAllAlarms re-opens the feeds and the adapter replays a snapshot of currently-active conditions.

Configuration

All settings under DataConnectionLayer in appsettings.json bind to DataConnectionOptions. Global protocol settings (OpcUa, MxGateway sections) bind to OpcUaGlobalOptions and MxGatewayGlobalOptions.

Shared settings (DataConnectionLayer section)

Key Default Description
ReconnectInterval 00:00:05 Fixed interval between reconnection attempts after a disconnect or failed connect.
TagResolutionRetryInterval 00:00:10 Retry interval for tag paths that could not be resolved on the device.
WriteTimeout 00:00:30 Timeout applied to each WriteAsync call. A hung write times out and returns an error to the calling script.
StableConnectionThreshold 00:01:00 A connection that drops before this duration is counted as an unstable disconnect toward FailoverRetryCount.

OPC UA global settings (OpcUa section)

Key Default Description
ApplicationName ScadaBridge-DCL Application name used in the OPC UA certificate and session negotiation.
TrustedIssuerStorePath "" File path to the trusted issuer certificate store.
TrustedPeerStorePath "" File path to the trusted peer certificate store.
RejectedCertificateStorePath "" File path to the rejected certificate store.

Empty store paths fall back to a temp-path default so dev runs work without explicit configuration.

Per-connection settings (OPC UA)

Stored in the DataConnection.PrimaryConfiguration JSON dict, parsed by OpcUaEndpointConfigSerializer.FromFlatDict.

Key Default Description
endpoint / EndpointUrl opc.tcp://localhost:4840 OPC UA server endpoint URL.
SessionTimeoutMs 60000 Session timeout in ms.
OperationTimeoutMs 15000 Transport operation timeout in ms.
PublishingIntervalMs 1000 Subscription publishing interval in ms.
KeepAliveCount 10 Keep-alive frames before session timeout.
LifetimeCount 30 Subscription lifetime in publish intervals.
SamplingIntervalMs 1000 Per-item server sampling rate in ms.
QueueSize 10 Per-item notification buffer size.
SecurityMode None None, Sign, or SignAndEncrypt.
AutoAcceptUntrustedCerts true Accept untrusted server certificates.

Per-connection settings (MxGateway)

Stored in DataConnection.PrimaryConfiguration, parsed by MxGatewayEndpointConfigSerializer.FromFlatDict.

Key Default Description
Endpoint http://localhost:5000 Gateway base URL.
ApiKey Sent as Authorization: Bearer <key>. Redacted in logs.
ClientName scadabridge MXAccess client registration name.
WriteUserId 0 MXAccess user id on writes. 0 = no user context.
UseTls false Use TLS.
CaFile CA certificate file path (TLS only).
ServerName TLS server-name override.
ReadTimeoutMs 5000 ReadBulk per-call timeout in ms.

Per-connection failover settings

Carried on CreateConnectionCommand from the deployment artifact.

Field Default Description
BackupConnectionDetails null If absent, the connection retries its single endpoint indefinitely.
FailoverRetryCount 3 Consecutive failures (or unstable disconnects) before switching endpoints.

Dependencies & Interactions

  • Site Runtime (#3)DeploymentManagerActor sends CreateConnectionCommand / RemoveConnectionCommand to create and tear down connections during instance deployment. Instance Actors send SubscribeTagsRequest / UnsubscribeTagsRequest / WriteTagRequest. NativeAlarmActor (peer to AlarmActor under InstanceActor) sends SubscribeAlarmsRequest / UnsubscribeAlarmsRequest and receives NativeAlarmTransitionUpdate / NativeAlarmSourceUnavailable.
  • Commons (#16) — owns IDataConnection, IAlarmSubscribableConnection, IBrowsableDataConnection interfaces; TagValue, QualityCode, NativeAlarmTransition, AlarmConditionState, AlarmTransitionKind types; and all message contracts (SubscribeTagsRequest/Response, TagValueUpdate, WriteTagRequest/Response, SubscribeAlarmsRequest/Response, NativeAlarmTransitionUpdate, NativeAlarmSourceUnavailable, CreateConnectionCommand, BrowseNodeCommand/Result, DataConnectionHealthReport).
  • Health Monitoring (#11)DataConnectionActor calls ISiteHealthCollector.UpdateConnectionHealth, UpdateTagResolution, UpdateTagQuality, UpdateConnectionEndpoint, and RemoveConnection to keep the site health report current. DataConnectionManagerActor handles GetAllHealthReports by forwarding GetHealthReport to each child.
  • Site Event Logging (#12)DataConnectionActor logs connection lost, restored, and failover events via ISiteEventLogger.LogEventAsync (fire-and-forget). Absent in tests where ISiteEventLogger is null.
  • Design spec: Component-DataConnectionLayer.md.

Troubleshooting

Tags stuck in bad quality after reconnect

After a reconnect, ReSubscribeAll re-issues all subscriptions. Tags that resolve successfully immediately receive a seeded value from ReadAsync before the subscription's first change notification arrives, so the Instance Actor sees a good-quality value promptly. Tags that fail resolution land in _unresolvedTags and are retried at TagResolutionRetryInterval. If a tag is persistently bad, check whether the tag path exists on the device and whether the device is online. The health report's ResolvedTags vs TotalSubscribedTags counters expose the gap without requiring the debug view.

Connection not failing over to backup

The failover counter uses two independent paths: _consecutiveFailures for connect-attempt failures, and _consecutiveUnstableDisconnects for connections that drop before StableConnectionThreshold. Both must reach FailoverRetryCount to trigger a switch. A connection that succeeds but immediately drops does not increment _consecutiveFailures — it increments _consecutiveUnstableDisconnects instead. Verify both counters are relevant to the observed failure pattern. Failover events are written to Site Event Logging as Warning entries.

Browse hangs on "loading…"

The DataConnectionActor caps BrowseNodeResult to ~100 KB before returning it. If the picker hangs rather than showing a "results truncated" hint, the reply may have been silently discarded by Akka remoting before reaching central (the reply crosses the site→central frame). Check whether the connection actor is in Connecting or Reconnecting state — browse while disconnected returns BrowseFailureKind.ConnectionNotConnected immediately rather than hanging.

Alarm feed not receiving transitions

Confirm that the connection's adapter implements IAlarmSubscribableConnection (currently OpcUaDataConnection and MxGatewayDataConnection). A non-capable adapter returns SubscribeAlarmsResponse(Success = false). If the capability is present, check whether the connection is in Connected state — SubscribeAlarmsRequest is stashed while Connecting or Reconnecting and processed on entering Connected. On reconnect, ReSubscribeAllAlarms re-opens the feeds and the adapter replays a snapshot.