Files
Joseph Doherty c5fb02d640 docs(components): accuracy fixes from deep review (batch 1)
Commons (third-party dep, 7 namespaces, retired ApiKey, repo SaveChanges
carve-out), ConfigurationDatabase (5 persisted + 1 non-persisted computed col),
ClusterInfrastructure (abbreviated HOCON note, RemotingPort default),
Host (component matrix: CI/HealthMonitoring/ExternalSystemGateway have no
actors; DeadLetterMonitorActor runs on both roles), Security (Bearer not
X-API-Key; ApiKeyAdmin registered by Host), Communication (Task.Run/Sender).
2026-06-03 16:32:01 -04:00

22 KiB
Raw Permalink Blame History

CentralSite Communication

The CentralSite Communication component is the transport layer that connects the central cluster to every site cluster. It provides two independent transports — Akka.NET ClusterClient for command/control and gRPC server-streaming for real-time data — wired together through a pair of actors that each cluster registers with the ClusterClientReceptionist.

Overview

Communication (#5) runs on every node in every cluster. The component code lives in src/ZB.MOM.WW.ScadaBridge.Communication/, organised as follows:

  • Actors/CentralCommunicationActor, SiteCommunicationActor, DebugStreamBridgeActor, StreamRelayActor.
  • Grpc/SiteStreamGrpcServer, SiteStreamGrpcClient, SiteStreamGrpcClientFactory, ISiteStreamSubscriber, and the proto DTO mappers.
  • Protos/sitestream.proto (proto source; generated C# is vendored in SiteStreamGrpc/).
  • CommunicationService.cs — typed Ask-pattern façade used by callers on the central side.
  • DebugStreamService.cs — session manager for debug stream bridge actors.
  • CommunicationOptions.cs — configuration options class.
  • ServiceCollectionExtensions.cs — DI registration (AddCommunication).

DI registration is called from the Host composition root via AddCommunication. The actors themselves are created inside AkkaHostedService.RegisterCentralActors / RegisterSiteActors because they must be created within the actor system, not by the DI container.

Key Concepts

Two transports, two concerns

Transport Direction Purpose
Akka.NET ClusterClient bidirectional (command/control) Deployments, lifecycle, subscribe/unsubscribe handshake, snapshots, heartbeats, health reports, telemetry, notifications
gRPC server-streaming (SiteStreamService) site → central Real-time attribute value and alarm state changes

The transports are independent. A gRPC stream interruption does not affect in-flight ClusterClient commands, and vice versa.

Hub-and-spoke topology

Sites do not communicate with each other. All inter-cluster traffic flows through central. Central maintains one ClusterClient per site; each site maintains a single ClusterClient pointed at both central nodes.

SiteEnvelope routing

Central-side callers wrap outbound messages in a SiteEnvelope(SiteId, Message). CentralCommunicationActor resolves the site's ClusterClient by SiteId and forwards the inner message to /user/site-communication on the site:

// CommunicationService.cs — deployment pattern
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
    string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
{
    var envelope = new SiteEnvelope(siteId, command);
    return await GetActor().Ask<DeploymentStatusResponse>(
        envelope, _options.DeploymentTimeout, cancellationToken);
}

CentralCommunicationActor.HandleSiteEnvelope extracts the inner message and routes it via the cached ClusterClient:

private void HandleSiteEnvelope(SiteEnvelope envelope)
{
    if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
    {
        _log.Warning("No ClusterClient for site {0}, cannot route message {1}",
            envelope.SiteId, envelope.Message.GetType().Name);
        return;  // caller's Ask times out — no central buffering
    }

    entry.Client.Tell(
        new ClusterClient.Send("/user/site-communication", envelope.Message),
        Sender);
}

No central buffering

If a site is unreachable when a command arrives, the caller's Ask times out. Central never queues command/control messages on behalf of a site. This is deliberate: it keeps the central coordinator stateless with respect to site availability and pushes retry responsibility to the operator or to the Store-and-Forward Engine for messages that tolerate it.

Architecture

Central-side: CentralCommunicationActor

CentralCommunicationActor is a ReceiveActor created at /user/central-communication and registered with ClusterClientReceptionist so the site's ClusterClient can locate it. It owns:

  • A Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> keyed by site identifier — one ClusterClient per site.
  • A RefreshSiteAddresses periodic timer (60-second cadence, starting immediately). Each tick fires LoadSiteAddressesFromDb, which reads every Site row from the database, parses NodeAAddress and NodeBAddress into Akka receptionist paths ({addr}/system/receptionist), and pipes a SiteAddressCacheLoaded message back to Self. HandleSiteAddressCacheLoaded creates, updates, or stops ClusterClient actors based on the diff.
  • Proxy references to NotificationOutboxActor and AuditLogIngestActor cluster singletons, injected post-construction via RegisterNotificationOutbox / RegisterAuditIngest messages from the Host. Messages that arrive before the proxy is registered are answered with a non-accepted ack (notifications) or an empty reply (audit), so the site retries without data loss.
  • Fanout of SiteHealthReport to the peer central node via DistributedPubSub, keyed on the site-health-replica topic, so both central nodes' aggregators stay in sync regardless of which central node the site's ClusterClient load-balanced the report to.

ISiteClientFactory / DefaultSiteClientFactory abstract ClusterClient construction for testability.

Site-side: SiteCommunicationActor

SiteCommunicationActor is a ReceiveActor created at /user/site-communication and registered with ClusterClientReceptionist. It owns:

  • An IActorRef? _centralClient — the site's outbound ClusterClient to central. Injected post-construction via RegisterCentralClient.
  • A Timers-based heartbeat (default 5-second interval, first tick after 1 second). Each tick sends a HeartbeatMessage with IsActive stamped from the Akka Cluster leader check — the node is active when its MemberStatus is Up and it holds cluster leadership.
  • Dispatch to local handlers for every inbound command pattern. Handlers for event-log, parked-message, integration, and artifact patterns are registered post-construction via RegisterLocalHandler; unregistered patterns receive an inline error reply so the central Ask does not stall.

Site-to-central messages (health reports, audit batches, notification submissions) are sent via:

_centralClient.Tell(
    new ClusterClient.Send("/user/central-communication", msg), Sender);

For request/response messages, the original Sender is forwarded as the ClusterClient.Send sender so any reply from central routes straight back to the waiting Ask on the site, not through SiteCommunicationActor. For fire-and-forget messages (e.g. SiteHealthReport), Self is used as the sender instead, because no reply is expected.

Address loading and the 60-second refresh

CentralCommunicationActor calls ISiteRepository.GetAllSitesAsync inside a background Task.Run (to avoid blocking the actor thread on a database round-trip) and pipes the result as SiteAddressCacheLoaded. The actor-lifecycle CancellationTokenSource is threaded into the repository call so a slow MS SQL query is cancelled when the actor stops.

A malformed address for one site does not abort the refresh loop — the actor catches the parse failure, logs a warning, skips that site, and processes the rest. The refresh also runs immediately on startup (TimeSpan.Zero initial delay) so the cache is populated before the first command arrives.

CommunicationService.RefreshSiteAddresses() triggers an on-demand refresh when a site record is added, edited, or deleted from the Central UI or CLI.

gRPC real-time data transport

Real-time attribute value and alarm state changes are delivered over SiteStreamService, a gRPC server-streaming service defined in sitestream.proto.

Site-sideSiteStreamGrpcServer (Kestrel HTTP/2, port 8083):

  • Implements SiteStreamService.SiteStreamServiceBase.
  • For each SubscribeInstance call, creates a StreamRelayActor (named stream-relay-{correlationId}-{seq}) and subscribes it to ISiteStreamSubscriber (implemented by SiteStreamManager in the Site Runtime project — SiteStreamGrpcServer holds only the interface so it does not reference SiteRuntime directly).
  • Bridges events via a BoundedChannel<SiteStreamEvent> (capacity 1000, DropOldest) from the actor thread to the async gRPC write loop.
  • Enforces a GrpcMaxConcurrentStreams limit (default 100) and a GrpcMaxStreamLifetime session timeout (default 4 hours) to evict zombie streams.
  • Validates correlation_id against ActorPath.IsValidPathElement before use in an actor name, rejecting invalid values with StatusCode.InvalidArgument.
  • During CoordinatedShutdown, CancelAllStreams() flips _shuttingDown, refuses new subscriptions with StatusCode.Unavailable, and cancels all active CancellationTokenSources.

StreamRelayActor is a lightweight ReceiveActor that converts AttributeValueChanged and AlarmStateChanged domain events to proto SiteStreamEvent messages and writes them to the channel writer:

// StreamRelayActor.cs
private void HandleAttributeValueChanged(AttributeValueChanged msg)
{
    var protoEvent = new SiteStreamEvent
    {
        CorrelationId = _correlationId,
        AttributeChanged = new AttributeValueUpdate
        {
            InstanceUniqueName = msg.InstanceUniqueName,
            AttributePath = msg.AttributePath,
            AttributeName = msg.AttributeName,
            Value = ValueFormatter.FormatDisplayValue(msg.Value),
            Quality = MapQuality(msg.Quality),
            Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
        }
    };
    WriteToChannel(protoEvent);
}

Central-sideSiteStreamGrpcClient / SiteStreamGrpcClientFactory:

  • SiteStreamGrpcClientFactory (singleton) caches one SiteStreamGrpcClient per site identifier. On GetOrCreate, it compares the cached client's Endpoint to the requested endpoint and atomically replaces a stale client (different endpoint — NodeA→NodeB failover flip, or an edited address) with a fresh one.
  • SiteStreamGrpcClient opens a GrpcChannel with HTTP/2 keepalive (KeepAlivePingDelay default 15 s, KeepAlivePingTimeout default 10 s, KeepAlivePingPolicy.Always). SubscribeAsync is a plain async Task that calls SubscribeInstance and reads the response stream with await foreach, invoking onEvent for each received event and onError on any non-cancellation exception. The caller (DebugStreamBridgeActor.OpenGrpcStream) launches it inside a Task.Run so the long-running stream loop runs off the actor thread.

Debug stream session lifecycle

DebugStreamService manages one DebugStreamBridgeActor per active debug session. On StartStreamAsync, it resolves the instance's site and gRPC addresses, creates the bridge actor, and holds the session in a ConcurrentDictionary.

DebugStreamBridgeActor (one per session, short-lived, no persistence):

  1. In PreStart, sends SubscribeDebugViewRequest to CentralCommunicationActor (ClusterClient, for the initial snapshot).
  2. On receiving DebugViewSnapshot, fires onEvent(snapshot) and calls OpenGrpcStream.
  3. OpenGrpcStream calls _grpcFactory.GetOrCreate(siteId, endpoint) and launches client.SubscribeAsync(...) as a background task. Domain events are marshalled back to the actor via Self.Tell for thread safety.
  4. On a gRPC error, flips to the other node endpoint and retries (first retry immediate, subsequent retries with ReconnectDelay default 5 s). The retry budget (MaxRetries = 3) is recovered only after StabilityWindow (default 60 s) of uninterrupted connection — a stream that delivers one event then immediately fails does not count as stable.
  5. On StopDebugStream, cancels the gRPC subscription and sends UnsubscribeDebugViewRequest to the site via ClusterClient.

Proto definition summary

// Protos/sitestream.proto
service SiteStreamService {
  rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
  rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
  rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
  rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse);
}

SubscribeInstance carries the real-time data stream. The other three RPCs (IngestAuditEvents, IngestCachedTelemetry, PullAuditEvents) serve the Audit Log component's gRPC telemetry push and reconciliation pull paths — SiteStreamGrpcServer hosts them on the same port because sites already listen there.

SiteStreamEvent uses a oneof event { AttributeValueUpdate, AlarmStateUpdate } discriminator. AlarmStateUpdate carries the full native alarm condition (fields 821) alongside the base computed-alarm fields (17), added additively so old clients ignoring unknown fields continue to work.

Usage

Central callers interact through CommunicationService, which wraps each command pattern in a typed Ask with a per-pattern timeout:

Pattern Method Timeout
Instance deployment DeployInstanceAsync 120 s
Instance lifecycle DisableInstanceAsync, EnableInstanceAsync, DeleteInstanceAsync 30 s
Artifact deployment DeployArtifactsAsync 60 s
Integration routing RouteIntegrationCallAsync 30 s
Debug snapshot RequestDebugSnapshotAsync 30 s
Remote queries QueryEventLogsAsync, QueryParkedMessagesAsync, etc. 30 s
OPC UA tag browse BrowseNodeAsync 30 s
Notification outbox (central-local) QueryNotificationOutboxAsync, RetryNotificationAsync, etc. 30 s
Site Call Audit (central-local) QuerySiteCallsAsync, RetrySiteCallAsync, etc. 30 s

Notification Outbox and Site Call Audit actors are central-local singletons — their CommunicationService methods Ask the proxy directly without wrapping in SiteEnvelope.

For real-time streaming, callers use DebugStreamService.StartStreamAsync, which creates a DebugStreamBridgeActor and returns a session handle. Ongoing events arrive via the onEvent callback; session teardown is via StopStreamAsync.

Configuration

All options are bound from the Communication section via CommunicationOptions:

Key Default Description
DeploymentTimeout 00:02:00 Ask timeout for instance deployment commands.
LifecycleTimeout 00:00:30 Ask timeout for lifecycle commands (disable, enable, delete).
ArtifactDeploymentTimeout 00:01:00 Ask timeout for system-wide artifact deployment.
QueryTimeout 00:00:30 Ask timeout for remote queries and management commands.
IntegrationTimeout 00:00:30 Ask timeout for integration routing and Inbound API routing.
DebugViewTimeout 00:00:10 Ask timeout for debug subscribe/unsubscribe handshake.
NotificationForwardTimeout 00:00:30 Ask timeout for notification submission forwarding.
CentralContactPoints [] Site-side: Akka addresses of central nodes, e.g. akka.tcp://scadabridge@central-a:8081.
GrpcKeepAlivePingDelay 00:00:15 HTTP/2 keepalive PING interval on SiteStreamGrpcClient.
GrpcKeepAlivePingTimeout 00:00:10 HTTP/2 keepalive PING timeout.
GrpcMaxStreamLifetime 04:00:00 Per-stream session timeout; forces reconnect of zombie streams.
GrpcMaxConcurrentStreams 100 Max concurrent SubscribeInstance streams per site node.
TransportHeartbeatInterval 00:00:05 SiteCommunicationActor heartbeat cadence.
TransportFailureThreshold 00:00:15 Akka remoting failure-detection threshold.

Three layers of dead-client detection protect the gRPC stream path:

Layer Detects Timeline
TCP RST Clean process death, connection close ~15 s
gRPC keepalive PING Network partition, silent crash ~25 s
Session timeout (GrpcMaxStreamLifetime) Zombie streams with misconfigured keepalive 4 h

Dependencies & Interactions

  • Commons (#16) — owns all message contracts used by this component: DeployInstanceCommand, SiteEnvelope, HeartbeatMessage, SiteHealthReport, SiteHealthReportReplica, RegisterNotificationOutbox, RegisterAuditIngest, IngestAuditEventsCommand, IngestCachedTelemetryCommand, and all other request/response records. Commons does not hold an Akka package reference, so RegisterAuditIngest (which carries an IActorRef) lives in this project.
  • Cluster Infrastructure (#13) — provides ClusterClientReceptionist registration and the active/standby leader model that SiteCommunicationActor's IsActive check and CentralCommunicationActor's DistributedPubSub fanout both depend on.
  • Configuration Database (#17) — provides ISiteRepository.GetAllSitesAsync for address loading; site records carry NodeAAddress, NodeBAddress, GrpcNodeAAddress, GrpcNodeBAddress.
  • Deployment Manager (#2) — the primary consumer of command/control patterns 13. CommunicationService is injected into the Deployment Manager actor to send deployments, lifecycle commands, and artifact deployments to sites.
  • Site Runtime (#3)SiteCommunicationActor forwards inbound commands to the DeploymentManager singleton proxy. SiteStreamManager (in Site Runtime) implements ISiteStreamSubscriber so SiteStreamGrpcServer can subscribe relay actors to instance event feeds without referencing Site Runtime directly.
  • Health Monitoring (#11)CentralCommunicationActor calls ICentralHealthAggregator.MarkHeartbeat and ProcessReport for every inbound heartbeat and health report. DistributedPubSub fanout keeps both central nodes' aggregators in sync.
  • Audit Log (#23)SiteStreamGrpcServer hosts IngestAuditEvents, IngestCachedTelemetry, and PullAuditEvents RPCs. CentralCommunicationActor routes IngestAuditEventsCommand / IngestCachedTelemetryCommand ClusterClient messages to the AuditLogIngestActor proxy.
  • Notification Outbox (#21)CentralCommunicationActor routes NotificationSubmit / NotificationStatusQuery messages from sites to the NotificationOutboxActor proxy. CommunicationService Asks the proxy directly for central-UI outbox management calls.
  • Site Call Audit (#22)CommunicationService Asks the SiteCallAuditActor proxy directly for query and relay operations. SiteCallAuditActor issues RetryParkedOperation / DiscardParkedOperation relay commands to sites via SiteEnvelope; SiteCommunicationActor dispatches them to _parkedMessageHandler.
  • Store-and-Forward Engine (#6) — the site S&F Engine drives NotificationSubmit forwarding and cached-call telemetry emission through SiteCommunicationActor. Parked-message queries and retry/discard relay commands flow back the other way.
  • Management Service (#18)ManagementActor is registered with ClusterClientReceptionist at /user/management on central; the CLI connects via its own separate ClusterClient. This is a distinct ClusterClient usage from the inter-cluster hub-and-spoke connections managed by this component.
  • Design spec: Component-Communication.md.

Troubleshooting

A site's commands fail immediately

Check that NodeAAddress and NodeBAddress are populated in the site configuration — if both are empty, CentralCommunicationActor logs a warning and skips that site on every refresh, so no ClusterClient is created and all commands timeout. CommunicationService.RefreshSiteAddresses() triggers an on-demand refresh after an address is added.

Commands are timing out but the site is reachable

A single malformed address string for one site can silently prevent ClusterClient creation for that site while other sites are unaffected. Check the logs for a Warning line from HandleSiteAddressCacheLoaded naming the offending site. The actor parse-guard catches the ActorPath.Parse exception per-site so the rest of the refresh proceeds.

A Warning at the Status.Failure handler in CentralCommunicationActor means LoadSiteAddressesFromDb itself threw (typically a SQL connection error); the cache is left stale until the next successful refresh.

gRPC debug stream drops immediately after opening

SiteStreamGrpcServer rejects correlation_id values that contain characters invalid in Akka actor names (/, whitespace, etc.) with StatusCode.InvalidArgument. Verify that the calling DebugStreamBridgeActor generates a safe correlation ID.

After a site node failover, the DebugStreamBridgeActor attempts to reconnect to the other node endpoint (_useNodeA flips on each error). If both nodes are unreachable, the actor exhausts its 3-retry budget and calls onTerminated. The engineer must restart the debug session.

Heartbeats arrive but health reports do not

SiteCommunicationActor sends heartbeats and health reports via separate paths. Health reports are sent only when the site's HealthReportSender publishes them (every 30 s by default). If heartbeats arrive but reports do not, the health-report sender on the site may have faulted — check site-side logs for errors in HealthReportSender.