Commons (third-party dep, 7 namespaces, retired ApiKey, repo SaveChanges carve-out), ConfigurationDatabase (5 persisted + 1 non-persisted computed col), ClusterInfrastructure (abbreviated HOCON note, RemotingPort default), Host (component matrix: CI/HealthMonitoring/ExternalSystemGateway have no actors; DeadLetterMonitorActor runs on both roles), Security (Bearer not X-API-Key; ApiKeyAdmin registered by Host), Communication (Task.Run/Sender).
22 KiB
Central–Site Communication
The Central–Site Communication component is the transport layer that connects the central cluster to every site cluster. It provides two independent transports — Akka.NET ClusterClient for command/control and gRPC server-streaming for real-time data — wired together through a pair of actors that each cluster registers with the ClusterClientReceptionist.
Overview
Communication (#5) runs on every node in every cluster. The component code lives in src/ZB.MOM.WW.ScadaBridge.Communication/, organised as follows:
Actors/—CentralCommunicationActor,SiteCommunicationActor,DebugStreamBridgeActor,StreamRelayActor.Grpc/—SiteStreamGrpcServer,SiteStreamGrpcClient,SiteStreamGrpcClientFactory,ISiteStreamSubscriber, and the proto DTO mappers.Protos/—sitestream.proto(proto source; generated C# is vendored inSiteStreamGrpc/).CommunicationService.cs— typed Ask-pattern façade used by callers on the central side.DebugStreamService.cs— session manager for debug stream bridge actors.CommunicationOptions.cs— configuration options class.ServiceCollectionExtensions.cs— DI registration (AddCommunication).
DI registration is called from the Host composition root via AddCommunication. The actors themselves are created inside AkkaHostedService.RegisterCentralActors / RegisterSiteActors because they must be created within the actor system, not by the DI container.
Key Concepts
Two transports, two concerns
| Transport | Direction | Purpose |
|---|---|---|
Akka.NET ClusterClient |
bidirectional (command/control) | Deployments, lifecycle, subscribe/unsubscribe handshake, snapshots, heartbeats, health reports, telemetry, notifications |
gRPC server-streaming (SiteStreamService) |
site → central | Real-time attribute value and alarm state changes |
The transports are independent. A gRPC stream interruption does not affect in-flight ClusterClient commands, and vice versa.
Hub-and-spoke topology
Sites do not communicate with each other. All inter-cluster traffic flows through central. Central maintains one ClusterClient per site; each site maintains a single ClusterClient pointed at both central nodes.
SiteEnvelope routing
Central-side callers wrap outbound messages in a SiteEnvelope(SiteId, Message). CentralCommunicationActor resolves the site's ClusterClient by SiteId and forwards the inner message to /user/site-communication on the site:
// CommunicationService.cs — deployment pattern
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<DeploymentStatusResponse>(
envelope, _options.DeploymentTimeout, cancellationToken);
}
CentralCommunicationActor.HandleSiteEnvelope extracts the inner message and routes it via the cached ClusterClient:
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
return; // caller's Ask times out — no central buffering
}
entry.Client.Tell(
new ClusterClient.Send("/user/site-communication", envelope.Message),
Sender);
}
No central buffering
If a site is unreachable when a command arrives, the caller's Ask times out. Central never queues command/control messages on behalf of a site. This is deliberate: it keeps the central coordinator stateless with respect to site availability and pushes retry responsibility to the operator or to the Store-and-Forward Engine for messages that tolerate it.
Architecture
Central-side: CentralCommunicationActor
CentralCommunicationActor is a ReceiveActor created at /user/central-communication and registered with ClusterClientReceptionist so the site's ClusterClient can locate it. It owns:
- A
Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)>keyed by site identifier — oneClusterClientper site. - A
RefreshSiteAddressesperiodic timer (60-second cadence, starting immediately). Each tick firesLoadSiteAddressesFromDb, which reads everySiterow from the database, parsesNodeAAddressandNodeBAddressinto Akka receptionist paths ({addr}/system/receptionist), and pipes aSiteAddressCacheLoadedmessage back to Self.HandleSiteAddressCacheLoadedcreates, updates, or stopsClusterClientactors based on the diff. - Proxy references to
NotificationOutboxActorandAuditLogIngestActorcluster singletons, injected post-construction viaRegisterNotificationOutbox/RegisterAuditIngestmessages from the Host. Messages that arrive before the proxy is registered are answered with a non-accepted ack (notifications) or an empty reply (audit), so the site retries without data loss. - Fanout of
SiteHealthReportto the peer central node viaDistributedPubSub, keyed on thesite-health-replicatopic, so both central nodes' aggregators stay in sync regardless of which central node the site'sClusterClientload-balanced the report to.
ISiteClientFactory / DefaultSiteClientFactory abstract ClusterClient construction for testability.
Site-side: SiteCommunicationActor
SiteCommunicationActor is a ReceiveActor created at /user/site-communication and registered with ClusterClientReceptionist. It owns:
- An
IActorRef? _centralClient— the site's outboundClusterClientto central. Injected post-construction viaRegisterCentralClient. - A
Timers-based heartbeat (default 5-second interval, first tick after 1 second). Each tick sends aHeartbeatMessagewithIsActivestamped from the AkkaClusterleader check — the node is active when itsMemberStatusisUpand it holds cluster leadership. - Dispatch to local handlers for every inbound command pattern. Handlers for event-log, parked-message, integration, and artifact patterns are registered post-construction via
RegisterLocalHandler; unregistered patterns receive an inline error reply so the central Ask does not stall.
Site-to-central messages (health reports, audit batches, notification submissions) are sent via:
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
For request/response messages, the original Sender is forwarded as the ClusterClient.Send sender so any reply from central routes straight back to the waiting Ask on the site, not through SiteCommunicationActor. For fire-and-forget messages (e.g. SiteHealthReport), Self is used as the sender instead, because no reply is expected.
Address loading and the 60-second refresh
CentralCommunicationActor calls ISiteRepository.GetAllSitesAsync inside a background Task.Run (to avoid blocking the actor thread on a database round-trip) and pipes the result as SiteAddressCacheLoaded. The actor-lifecycle CancellationTokenSource is threaded into the repository call so a slow MS SQL query is cancelled when the actor stops.
A malformed address for one site does not abort the refresh loop — the actor catches the parse failure, logs a warning, skips that site, and processes the rest. The refresh also runs immediately on startup (TimeSpan.Zero initial delay) so the cache is populated before the first command arrives.
CommunicationService.RefreshSiteAddresses() triggers an on-demand refresh when a site record is added, edited, or deleted from the Central UI or CLI.
gRPC real-time data transport
Real-time attribute value and alarm state changes are delivered over SiteStreamService, a gRPC server-streaming service defined in sitestream.proto.
Site-side — SiteStreamGrpcServer (Kestrel HTTP/2, port 8083):
- Implements
SiteStreamService.SiteStreamServiceBase. - For each
SubscribeInstancecall, creates aStreamRelayActor(namedstream-relay-{correlationId}-{seq}) and subscribes it toISiteStreamSubscriber(implemented bySiteStreamManagerin the Site Runtime project —SiteStreamGrpcServerholds only the interface so it does not referenceSiteRuntimedirectly). - Bridges events via a
BoundedChannel<SiteStreamEvent>(capacity 1000,DropOldest) from the actor thread to the async gRPC write loop. - Enforces a
GrpcMaxConcurrentStreamslimit (default 100) and aGrpcMaxStreamLifetimesession timeout (default 4 hours) to evict zombie streams. - Validates
correlation_idagainstActorPath.IsValidPathElementbefore use in an actor name, rejecting invalid values withStatusCode.InvalidArgument. - During
CoordinatedShutdown,CancelAllStreams()flips_shuttingDown, refuses new subscriptions withStatusCode.Unavailable, and cancels all activeCancellationTokenSources.
StreamRelayActor is a lightweight ReceiveActor that converts AttributeValueChanged and AlarmStateChanged domain events to proto SiteStreamEvent messages and writes them to the channel writer:
// StreamRelayActor.cs
private void HandleAttributeValueChanged(AttributeValueChanged msg)
{
var protoEvent = new SiteStreamEvent
{
CorrelationId = _correlationId,
AttributeChanged = new AttributeValueUpdate
{
InstanceUniqueName = msg.InstanceUniqueName,
AttributePath = msg.AttributePath,
AttributeName = msg.AttributeName,
Value = ValueFormatter.FormatDisplayValue(msg.Value),
Quality = MapQuality(msg.Quality),
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
}
};
WriteToChannel(protoEvent);
}
Central-side — SiteStreamGrpcClient / SiteStreamGrpcClientFactory:
SiteStreamGrpcClientFactory(singleton) caches oneSiteStreamGrpcClientper site identifier. OnGetOrCreate, it compares the cached client'sEndpointto the requested endpoint and atomically replaces a stale client (different endpoint — NodeA→NodeB failover flip, or an edited address) with a fresh one.SiteStreamGrpcClientopens aGrpcChannelwith HTTP/2 keepalive (KeepAlivePingDelaydefault 15 s,KeepAlivePingTimeoutdefault 10 s,KeepAlivePingPolicy.Always).SubscribeAsyncis a plainasync Taskthat callsSubscribeInstanceand reads the response stream withawait foreach, invokingonEventfor each received event andonErroron any non-cancellation exception. The caller (DebugStreamBridgeActor.OpenGrpcStream) launches it inside aTask.Runso the long-running stream loop runs off the actor thread.
Debug stream session lifecycle
DebugStreamService manages one DebugStreamBridgeActor per active debug session. On StartStreamAsync, it resolves the instance's site and gRPC addresses, creates the bridge actor, and holds the session in a ConcurrentDictionary.
DebugStreamBridgeActor (one per session, short-lived, no persistence):
- In
PreStart, sendsSubscribeDebugViewRequesttoCentralCommunicationActor(ClusterClient, for the initial snapshot). - On receiving
DebugViewSnapshot, firesonEvent(snapshot)and callsOpenGrpcStream. OpenGrpcStreamcalls_grpcFactory.GetOrCreate(siteId, endpoint)and launchesclient.SubscribeAsync(...)as a background task. Domain events are marshalled back to the actor viaSelf.Tellfor thread safety.- On a gRPC error, flips to the other node endpoint and retries (first retry immediate, subsequent retries with
ReconnectDelaydefault 5 s). The retry budget (MaxRetries = 3) is recovered only afterStabilityWindow(default 60 s) of uninterrupted connection — a stream that delivers one event then immediately fails does not count as stable. - On
StopDebugStream, cancels the gRPC subscription and sendsUnsubscribeDebugViewRequestto the site via ClusterClient.
Proto definition summary
// Protos/sitestream.proto
service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse);
}
SubscribeInstance carries the real-time data stream. The other three RPCs (IngestAuditEvents, IngestCachedTelemetry, PullAuditEvents) serve the Audit Log component's gRPC telemetry push and reconciliation pull paths — SiteStreamGrpcServer hosts them on the same port because sites already listen there.
SiteStreamEvent uses a oneof event { AttributeValueUpdate, AlarmStateUpdate } discriminator. AlarmStateUpdate carries the full native alarm condition (fields 8–21) alongside the base computed-alarm fields (1–7), added additively so old clients ignoring unknown fields continue to work.
Usage
Central callers interact through CommunicationService, which wraps each command pattern in a typed Ask with a per-pattern timeout:
| Pattern | Method | Timeout |
|---|---|---|
| Instance deployment | DeployInstanceAsync |
120 s |
| Instance lifecycle | DisableInstanceAsync, EnableInstanceAsync, DeleteInstanceAsync |
30 s |
| Artifact deployment | DeployArtifactsAsync |
60 s |
| Integration routing | RouteIntegrationCallAsync |
30 s |
| Debug snapshot | RequestDebugSnapshotAsync |
30 s |
| Remote queries | QueryEventLogsAsync, QueryParkedMessagesAsync, etc. |
30 s |
| OPC UA tag browse | BrowseNodeAsync |
30 s |
| Notification outbox (central-local) | QueryNotificationOutboxAsync, RetryNotificationAsync, etc. |
30 s |
| Site Call Audit (central-local) | QuerySiteCallsAsync, RetrySiteCallAsync, etc. |
30 s |
Notification Outbox and Site Call Audit actors are central-local singletons — their CommunicationService methods Ask the proxy directly without wrapping in SiteEnvelope.
For real-time streaming, callers use DebugStreamService.StartStreamAsync, which creates a DebugStreamBridgeActor and returns a session handle. Ongoing events arrive via the onEvent callback; session teardown is via StopStreamAsync.
Configuration
All options are bound from the Communication section via CommunicationOptions:
| Key | Default | Description |
|---|---|---|
DeploymentTimeout |
00:02:00 |
Ask timeout for instance deployment commands. |
LifecycleTimeout |
00:00:30 |
Ask timeout for lifecycle commands (disable, enable, delete). |
ArtifactDeploymentTimeout |
00:01:00 |
Ask timeout for system-wide artifact deployment. |
QueryTimeout |
00:00:30 |
Ask timeout for remote queries and management commands. |
IntegrationTimeout |
00:00:30 |
Ask timeout for integration routing and Inbound API routing. |
DebugViewTimeout |
00:00:10 |
Ask timeout for debug subscribe/unsubscribe handshake. |
NotificationForwardTimeout |
00:00:30 |
Ask timeout for notification submission forwarding. |
CentralContactPoints |
[] |
Site-side: Akka addresses of central nodes, e.g. akka.tcp://scadabridge@central-a:8081. |
GrpcKeepAlivePingDelay |
00:00:15 |
HTTP/2 keepalive PING interval on SiteStreamGrpcClient. |
GrpcKeepAlivePingTimeout |
00:00:10 |
HTTP/2 keepalive PING timeout. |
GrpcMaxStreamLifetime |
04:00:00 |
Per-stream session timeout; forces reconnect of zombie streams. |
GrpcMaxConcurrentStreams |
100 |
Max concurrent SubscribeInstance streams per site node. |
TransportHeartbeatInterval |
00:00:05 |
SiteCommunicationActor heartbeat cadence. |
TransportFailureThreshold |
00:00:15 |
Akka remoting failure-detection threshold. |
Three layers of dead-client detection protect the gRPC stream path:
| Layer | Detects | Timeline |
|---|---|---|
| TCP RST | Clean process death, connection close | ~1–5 s |
| gRPC keepalive PING | Network partition, silent crash | ~25 s |
Session timeout (GrpcMaxStreamLifetime) |
Zombie streams with misconfigured keepalive | 4 h |
Dependencies & Interactions
- Commons (#16) — owns all message contracts used by this component:
DeployInstanceCommand,SiteEnvelope,HeartbeatMessage,SiteHealthReport,SiteHealthReportReplica,RegisterNotificationOutbox,RegisterAuditIngest,IngestAuditEventsCommand,IngestCachedTelemetryCommand, and all other request/response records. Commons does not hold an Akka package reference, soRegisterAuditIngest(which carries anIActorRef) lives in this project. - Cluster Infrastructure (#13) — provides
ClusterClientReceptionistregistration and the active/standby leader model thatSiteCommunicationActor'sIsActivecheck andCentralCommunicationActor'sDistributedPubSubfanout both depend on. - Configuration Database (#17) — provides
ISiteRepository.GetAllSitesAsyncfor address loading; site records carryNodeAAddress,NodeBAddress,GrpcNodeAAddress,GrpcNodeBAddress. - Deployment Manager (#2) — the primary consumer of command/control patterns 1–3.
CommunicationServiceis injected into the Deployment Manager actor to send deployments, lifecycle commands, and artifact deployments to sites. - Site Runtime (#3) —
SiteCommunicationActorforwards inbound commands to theDeploymentManagersingleton proxy.SiteStreamManager(in Site Runtime) implementsISiteStreamSubscribersoSiteStreamGrpcServercan subscribe relay actors to instance event feeds without referencing Site Runtime directly. - Health Monitoring (#11) —
CentralCommunicationActorcallsICentralHealthAggregator.MarkHeartbeatandProcessReportfor every inbound heartbeat and health report.DistributedPubSubfanout keeps both central nodes' aggregators in sync. - Audit Log (#23) —
SiteStreamGrpcServerhostsIngestAuditEvents,IngestCachedTelemetry, andPullAuditEventsRPCs.CentralCommunicationActorroutesIngestAuditEventsCommand/IngestCachedTelemetryCommandClusterClient messages to theAuditLogIngestActorproxy. - Notification Outbox (#21) —
CentralCommunicationActorroutesNotificationSubmit/NotificationStatusQuerymessages from sites to theNotificationOutboxActorproxy.CommunicationServiceAsks the proxy directly for central-UI outbox management calls. - Site Call Audit (#22) —
CommunicationServiceAsks theSiteCallAuditActorproxy directly for query and relay operations.SiteCallAuditActorissuesRetryParkedOperation/DiscardParkedOperationrelay commands to sites viaSiteEnvelope;SiteCommunicationActordispatches them to_parkedMessageHandler. - Store-and-Forward Engine (#6) — the site S&F Engine drives
NotificationSubmitforwarding and cached-call telemetry emission throughSiteCommunicationActor. Parked-message queries and retry/discard relay commands flow back the other way. - Management Service (#18) —
ManagementActoris registered withClusterClientReceptionistat/user/managementon central; the CLI connects via its own separateClusterClient. This is a distinctClusterClientusage from the inter-cluster hub-and-spoke connections managed by this component. - Design spec: Component-Communication.md.
Troubleshooting
A site's commands fail immediately
Check that NodeAAddress and NodeBAddress are populated in the site configuration — if both are empty, CentralCommunicationActor logs a warning and skips that site on every refresh, so no ClusterClient is created and all commands timeout. CommunicationService.RefreshSiteAddresses() triggers an on-demand refresh after an address is added.
Commands are timing out but the site is reachable
A single malformed address string for one site can silently prevent ClusterClient creation for that site while other sites are unaffected. Check the logs for a Warning line from HandleSiteAddressCacheLoaded naming the offending site. The actor parse-guard catches the ActorPath.Parse exception per-site so the rest of the refresh proceeds.
A Warning at the Status.Failure handler in CentralCommunicationActor means LoadSiteAddressesFromDb itself threw (typically a SQL connection error); the cache is left stale until the next successful refresh.
gRPC debug stream drops immediately after opening
SiteStreamGrpcServer rejects correlation_id values that contain characters invalid in Akka actor names (/, whitespace, etc.) with StatusCode.InvalidArgument. Verify that the calling DebugStreamBridgeActor generates a safe correlation ID.
After a site node failover, the DebugStreamBridgeActor attempts to reconnect to the other node endpoint (_useNodeA flips on each error). If both nodes are unreachable, the actor exhausts its 3-retry budget and calls onTerminated. The engineer must restart the debug session.
Heartbeats arrive but health reports do not
SiteCommunicationActor sends heartbeats and health reports via separate paths. Health reports are sent only when the site's HealthReportSender publishes them (every 30 s by default). If heartbeats arrive but reports do not, the health-report sender on the site may have faulted — check site-side logs for errors in HealthReportSender.