# Central–Site Communication The Central–Site Communication component is the transport layer that connects the central cluster to every site cluster. It provides two independent transports — Akka.NET `ClusterClient` for command/control and gRPC server-streaming for real-time data — wired together through a pair of actors that each cluster registers with the `ClusterClientReceptionist`. ## Overview Communication (#5) runs on every node in every cluster. The component code lives in `src/ZB.MOM.WW.ScadaBridge.Communication/`, organised as follows: - `Actors/` — `CentralCommunicationActor`, `SiteCommunicationActor`, `DebugStreamBridgeActor`, `StreamRelayActor`. - `Grpc/` — `SiteStreamGrpcServer`, `SiteStreamGrpcClient`, `SiteStreamGrpcClientFactory`, `ISiteStreamSubscriber`, and the proto DTO mappers. - `Protos/` — `sitestream.proto` (proto source; generated C# is vendored in `SiteStreamGrpc/`). - `CommunicationService.cs` — typed Ask-pattern façade used by callers on the central side. - `DebugStreamService.cs` — session manager for debug stream bridge actors. - `CommunicationOptions.cs` — configuration options class. - `ServiceCollectionExtensions.cs` — DI registration (`AddCommunication`). DI registration is called from the Host composition root via `AddCommunication`. The actors themselves are created inside `AkkaHostedService.RegisterCentralActors` / `RegisterSiteActors` because they must be created within the actor system, not by the DI container. ## Key Concepts ### Two transports, two concerns | Transport | Direction | Purpose | |-----------|-----------|---------| | Akka.NET `ClusterClient` | bidirectional (command/control) | Deployments, lifecycle, subscribe/unsubscribe handshake, snapshots, heartbeats, health reports, telemetry, notifications | | gRPC server-streaming (`SiteStreamService`) | site → central | Real-time attribute value and alarm state changes | The transports are independent. A gRPC stream interruption does not affect in-flight `ClusterClient` commands, and vice versa. ### Hub-and-spoke topology Sites do not communicate with each other. All inter-cluster traffic flows through central. Central maintains one `ClusterClient` per site; each site maintains a single `ClusterClient` pointed at both central nodes. ### `SiteEnvelope` routing Central-side callers wrap outbound messages in a `SiteEnvelope(SiteId, Message)`. `CentralCommunicationActor` resolves the site's `ClusterClient` by `SiteId` and forwards the inner message to `/user/site-communication` on the site: ```csharp // CommunicationService.cs — deployment pattern public async Task DeployInstanceAsync( string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return await GetActor().Ask( envelope, _options.DeploymentTimeout, cancellationToken); } ``` `CentralCommunicationActor.HandleSiteEnvelope` extracts the inner message and routes it via the cached `ClusterClient`: ```csharp private void HandleSiteEnvelope(SiteEnvelope envelope) { if (!_siteClients.TryGetValue(envelope.SiteId, out var entry)) { _log.Warning("No ClusterClient for site {0}, cannot route message {1}", envelope.SiteId, envelope.Message.GetType().Name); return; // caller's Ask times out — no central buffering } entry.Client.Tell( new ClusterClient.Send("/user/site-communication", envelope.Message), Sender); } ``` ### No central buffering If a site is unreachable when a command arrives, the caller's Ask times out. Central never queues command/control messages on behalf of a site. This is deliberate: it keeps the central coordinator stateless with respect to site availability and pushes retry responsibility to the operator or to the Store-and-Forward Engine for messages that tolerate it. ## Architecture ### Central-side: `CentralCommunicationActor` `CentralCommunicationActor` is a `ReceiveActor` created at `/user/central-communication` and registered with `ClusterClientReceptionist` so the site's `ClusterClient` can locate it. It owns: - A `Dictionary ContactAddresses)>` keyed by site identifier — one `ClusterClient` per site. - A `RefreshSiteAddresses` periodic timer (60-second cadence, starting immediately). Each tick fires `LoadSiteAddressesFromDb`, which reads every `Site` row from the database, parses `NodeAAddress` and `NodeBAddress` into Akka receptionist paths (`{addr}/system/receptionist`), and pipes a `SiteAddressCacheLoaded` message back to Self. `HandleSiteAddressCacheLoaded` creates, updates, or stops `ClusterClient` actors based on the diff. - Proxy references to `NotificationOutboxActor` and `AuditLogIngestActor` cluster singletons, injected post-construction via `RegisterNotificationOutbox` / `RegisterAuditIngest` messages from the Host. Messages that arrive before the proxy is registered are answered with a non-accepted ack (notifications) or an empty reply (audit), so the site retries without data loss. - Fanout of `SiteHealthReport` to the peer central node via `DistributedPubSub`, keyed on the `site-health-replica` topic, so both central nodes' aggregators stay in sync regardless of which central node the site's `ClusterClient` load-balanced the report to. `ISiteClientFactory` / `DefaultSiteClientFactory` abstract `ClusterClient` construction for testability. ### Site-side: `SiteCommunicationActor` `SiteCommunicationActor` is a `ReceiveActor` created at `/user/site-communication` and registered with `ClusterClientReceptionist`. It owns: - An `IActorRef? _centralClient` — the site's outbound `ClusterClient` to central. Injected post-construction via `RegisterCentralClient`. - A `Timers`-based heartbeat (default 5-second interval, first tick after 1 second). Each tick sends a `HeartbeatMessage` with `IsActive` stamped from the Akka `Cluster` leader check — the node is active when its `MemberStatus` is `Up` and it holds cluster leadership. - Dispatch to local handlers for every inbound command pattern. Handlers for event-log, parked-message, integration, and artifact patterns are registered post-construction via `RegisterLocalHandler`; unregistered patterns receive an inline error reply so the central Ask does not stall. Site-to-central messages (health reports, audit batches, notification submissions) are sent via: ```csharp _centralClient.Tell( new ClusterClient.Send("/user/central-communication", msg), Sender); ``` For request/response messages, the original `Sender` is forwarded as the `ClusterClient.Send` sender so any reply from central routes straight back to the waiting Ask on the site, not through `SiteCommunicationActor`. For fire-and-forget messages (e.g. `SiteHealthReport`), `Self` is used as the sender instead, because no reply is expected. ### Address loading and the 60-second refresh `CentralCommunicationActor` calls `ISiteRepository.GetAllSitesAsync` inside a background `Task.Run` (to avoid blocking the actor thread on a database round-trip) and pipes the result as `SiteAddressCacheLoaded`. The actor-lifecycle `CancellationTokenSource` is threaded into the repository call so a slow MS SQL query is cancelled when the actor stops. A malformed address for one site does not abort the refresh loop — the actor catches the parse failure, logs a warning, skips that site, and processes the rest. The refresh also runs immediately on startup (`TimeSpan.Zero` initial delay) so the cache is populated before the first command arrives. `CommunicationService.RefreshSiteAddresses()` triggers an on-demand refresh when a site record is added, edited, or deleted from the Central UI or CLI. ### gRPC real-time data transport Real-time attribute value and alarm state changes are delivered over `SiteStreamService`, a gRPC server-streaming service defined in `sitestream.proto`. **Site-side** — `SiteStreamGrpcServer` (Kestrel HTTP/2, port 8083): - Implements `SiteStreamService.SiteStreamServiceBase`. - For each `SubscribeInstance` call, creates a `StreamRelayActor` (named `stream-relay-{correlationId}-{seq}`) and subscribes it to `ISiteStreamSubscriber` (implemented by `SiteStreamManager` in the Site Runtime project — `SiteStreamGrpcServer` holds only the interface so it does not reference `SiteRuntime` directly). - Bridges events via a `BoundedChannel` (capacity 1000, `DropOldest`) from the actor thread to the async gRPC write loop. - Enforces a `GrpcMaxConcurrentStreams` limit (default 100) and a `GrpcMaxStreamLifetime` session timeout (default 4 hours) to evict zombie streams. - Validates `correlation_id` against `ActorPath.IsValidPathElement` before use in an actor name, rejecting invalid values with `StatusCode.InvalidArgument`. - During `CoordinatedShutdown`, `CancelAllStreams()` flips `_shuttingDown`, refuses new subscriptions with `StatusCode.Unavailable`, and cancels all active `CancellationTokenSource`s. `StreamRelayActor` is a lightweight `ReceiveActor` that converts `AttributeValueChanged` and `AlarmStateChanged` domain events to proto `SiteStreamEvent` messages and writes them to the channel writer: ```csharp // StreamRelayActor.cs private void HandleAttributeValueChanged(AttributeValueChanged msg) { var protoEvent = new SiteStreamEvent { CorrelationId = _correlationId, AttributeChanged = new AttributeValueUpdate { InstanceUniqueName = msg.InstanceUniqueName, AttributePath = msg.AttributePath, AttributeName = msg.AttributeName, Value = ValueFormatter.FormatDisplayValue(msg.Value), Quality = MapQuality(msg.Quality), Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp) } }; WriteToChannel(protoEvent); } ``` **Central-side** — `SiteStreamGrpcClient` / `SiteStreamGrpcClientFactory`: - `SiteStreamGrpcClientFactory` (singleton) caches one `SiteStreamGrpcClient` per site identifier. On `GetOrCreate`, it compares the cached client's `Endpoint` to the requested endpoint and atomically replaces a stale client (different endpoint — NodeA→NodeB failover flip, or an edited address) with a fresh one. - `SiteStreamGrpcClient` opens a `GrpcChannel` with HTTP/2 keepalive (`KeepAlivePingDelay` default 15 s, `KeepAlivePingTimeout` default 10 s, `KeepAlivePingPolicy.Always`). `SubscribeAsync` is a plain `async Task` that calls `SubscribeInstance` and reads the response stream with `await foreach`, invoking `onEvent` for each received event and `onError` on any non-cancellation exception. The caller (`DebugStreamBridgeActor.OpenGrpcStream`) launches it inside a `Task.Run` so the long-running stream loop runs off the actor thread. ### Debug stream session lifecycle `DebugStreamService` manages one `DebugStreamBridgeActor` per active debug session. On `StartStreamAsync`, it resolves the instance's site and gRPC addresses, creates the bridge actor, and holds the session in a `ConcurrentDictionary`. `DebugStreamBridgeActor` (one per session, short-lived, no persistence): 1. In `PreStart`, sends `SubscribeDebugViewRequest` to `CentralCommunicationActor` (ClusterClient, for the initial snapshot). 2. On receiving `DebugViewSnapshot`, fires `onEvent(snapshot)` and calls `OpenGrpcStream`. 3. `OpenGrpcStream` calls `_grpcFactory.GetOrCreate(siteId, endpoint)` and launches `client.SubscribeAsync(...)` as a background task. Domain events are marshalled back to the actor via `Self.Tell` for thread safety. 4. On a gRPC error, flips to the other node endpoint and retries (first retry immediate, subsequent retries with `ReconnectDelay` default 5 s). The retry budget (`MaxRetries = 3`) is recovered only after `StabilityWindow` (default 60 s) of uninterrupted connection — a stream that delivers one event then immediately fails does not count as stable. 5. On `StopDebugStream`, cancels the gRPC subscription and sends `UnsubscribeDebugViewRequest` to the site via ClusterClient. ### Proto definition summary ```proto // Protos/sitestream.proto service SiteStreamService { rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent); rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck); rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck); rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse); } ``` `SubscribeInstance` carries the real-time data stream. The other three RPCs (`IngestAuditEvents`, `IngestCachedTelemetry`, `PullAuditEvents`) serve the Audit Log component's gRPC telemetry push and reconciliation pull paths — `SiteStreamGrpcServer` hosts them on the same port because sites already listen there. `SiteStreamEvent` uses a `oneof event { AttributeValueUpdate, AlarmStateUpdate }` discriminator. `AlarmStateUpdate` carries the full native alarm condition (fields 8–21) alongside the base computed-alarm fields (1–7), added additively so old clients ignoring unknown fields continue to work. ## Usage Central callers interact through `CommunicationService`, which wraps each command pattern in a typed `Ask` with a per-pattern timeout: | Pattern | Method | Timeout | |---------|--------|---------| | Instance deployment | `DeployInstanceAsync` | 120 s | | Instance lifecycle | `DisableInstanceAsync`, `EnableInstanceAsync`, `DeleteInstanceAsync` | 30 s | | Artifact deployment | `DeployArtifactsAsync` | 60 s | | Integration routing | `RouteIntegrationCallAsync` | 30 s | | Debug snapshot | `RequestDebugSnapshotAsync` | 30 s | | Remote queries | `QueryEventLogsAsync`, `QueryParkedMessagesAsync`, etc. | 30 s | | OPC UA tag browse | `BrowseNodeAsync` | 30 s | | Notification outbox (central-local) | `QueryNotificationOutboxAsync`, `RetryNotificationAsync`, etc. | 30 s | | Site Call Audit (central-local) | `QuerySiteCallsAsync`, `RetrySiteCallAsync`, etc. | 30 s | Notification Outbox and Site Call Audit actors are central-local singletons — their `CommunicationService` methods Ask the proxy directly without wrapping in `SiteEnvelope`. For real-time streaming, callers use `DebugStreamService.StartStreamAsync`, which creates a `DebugStreamBridgeActor` and returns a session handle. Ongoing events arrive via the `onEvent` callback; session teardown is via `StopStreamAsync`. ## Configuration All options are bound from the `Communication` section via `CommunicationOptions`: | Key | Default | Description | |-----|---------|-------------| | `DeploymentTimeout` | `00:02:00` | Ask timeout for instance deployment commands. | | `LifecycleTimeout` | `00:00:30` | Ask timeout for lifecycle commands (disable, enable, delete). | | `ArtifactDeploymentTimeout` | `00:01:00` | Ask timeout for system-wide artifact deployment. | | `QueryTimeout` | `00:00:30` | Ask timeout for remote queries and management commands. | | `IntegrationTimeout` | `00:00:30` | Ask timeout for integration routing and Inbound API routing. | | `DebugViewTimeout` | `00:00:10` | Ask timeout for debug subscribe/unsubscribe handshake. | | `NotificationForwardTimeout` | `00:00:30` | Ask timeout for notification submission forwarding. | | `CentralContactPoints` | `[]` | Site-side: Akka addresses of central nodes, e.g. `akka.tcp://scadabridge@central-a:8081`. | | `GrpcKeepAlivePingDelay` | `00:00:15` | HTTP/2 keepalive PING interval on `SiteStreamGrpcClient`. | | `GrpcKeepAlivePingTimeout` | `00:00:10` | HTTP/2 keepalive PING timeout. | | `GrpcMaxStreamLifetime` | `04:00:00` | Per-stream session timeout; forces reconnect of zombie streams. | | `GrpcMaxConcurrentStreams` | `100` | Max concurrent `SubscribeInstance` streams per site node. | | `TransportHeartbeatInterval` | `00:00:05` | `SiteCommunicationActor` heartbeat cadence. | | `TransportFailureThreshold` | `00:00:15` | Akka remoting failure-detection threshold. | Three layers of dead-client detection protect the gRPC stream path: | Layer | Detects | Timeline | |-------|---------|----------| | TCP RST | Clean process death, connection close | ~1–5 s | | gRPC keepalive PING | Network partition, silent crash | ~25 s | | Session timeout (`GrpcMaxStreamLifetime`) | Zombie streams with misconfigured keepalive | 4 h | ## Dependencies & Interactions - [Commons (#16)](./Commons.md) — owns all message contracts used by this component: `DeployInstanceCommand`, `SiteEnvelope`, `HeartbeatMessage`, `SiteHealthReport`, `SiteHealthReportReplica`, `RegisterNotificationOutbox`, `RegisterAuditIngest`, `IngestAuditEventsCommand`, `IngestCachedTelemetryCommand`, and all other request/response records. Commons does not hold an Akka package reference, so `RegisterAuditIngest` (which carries an `IActorRef`) lives in this project. - [Cluster Infrastructure (#13)](./ClusterInfrastructure.md) — provides `ClusterClientReceptionist` registration and the active/standby leader model that `SiteCommunicationActor`'s `IsActive` check and `CentralCommunicationActor`'s `DistributedPubSub` fanout both depend on. - [Configuration Database (#17)](./ConfigurationDatabase.md) — provides `ISiteRepository.GetAllSitesAsync` for address loading; site records carry `NodeAAddress`, `NodeBAddress`, `GrpcNodeAAddress`, `GrpcNodeBAddress`. - [Deployment Manager (#2)](./DeploymentManager.md) — the primary consumer of command/control patterns 1–3. `CommunicationService` is injected into the Deployment Manager actor to send deployments, lifecycle commands, and artifact deployments to sites. - [Site Runtime (#3)](./SiteRuntime.md) — `SiteCommunicationActor` forwards inbound commands to the `DeploymentManager` singleton proxy. `SiteStreamManager` (in Site Runtime) implements `ISiteStreamSubscriber` so `SiteStreamGrpcServer` can subscribe relay actors to instance event feeds without referencing Site Runtime directly. - [Health Monitoring (#11)](./HealthMonitoring.md) — `CentralCommunicationActor` calls `ICentralHealthAggregator.MarkHeartbeat` and `ProcessReport` for every inbound heartbeat and health report. `DistributedPubSub` fanout keeps both central nodes' aggregators in sync. - [Audit Log (#23)](./AuditLog.md) — `SiteStreamGrpcServer` hosts `IngestAuditEvents`, `IngestCachedTelemetry`, and `PullAuditEvents` RPCs. `CentralCommunicationActor` routes `IngestAuditEventsCommand` / `IngestCachedTelemetryCommand` ClusterClient messages to the `AuditLogIngestActor` proxy. - [Notification Outbox (#21)](./NotificationOutbox.md) — `CentralCommunicationActor` routes `NotificationSubmit` / `NotificationStatusQuery` messages from sites to the `NotificationOutboxActor` proxy. `CommunicationService` Asks the proxy directly for central-UI outbox management calls. - [Site Call Audit (#22)](./SiteCallAudit.md) — `CommunicationService` Asks the `SiteCallAuditActor` proxy directly for query and relay operations. `SiteCallAuditActor` issues `RetryParkedOperation` / `DiscardParkedOperation` relay commands to sites via `SiteEnvelope`; `SiteCommunicationActor` dispatches them to `_parkedMessageHandler`. - [Store-and-Forward Engine (#6)](./StoreAndForward.md) — the site S&F Engine drives `NotificationSubmit` forwarding and cached-call telemetry emission through `SiteCommunicationActor`. Parked-message queries and retry/discard relay commands flow back the other way. - [Management Service (#18)](./ManagementService.md) — `ManagementActor` is registered with `ClusterClientReceptionist` at `/user/management` on central; the CLI connects via its own separate `ClusterClient`. This is a distinct `ClusterClient` usage from the inter-cluster hub-and-spoke connections managed by this component. - Design spec: [Component-Communication.md](../requirements/Component-Communication.md). ## Troubleshooting ### A site's commands fail immediately Check that `NodeAAddress` and `NodeBAddress` are populated in the site configuration — if both are empty, `CentralCommunicationActor` logs a warning and skips that site on every refresh, so no `ClusterClient` is created and all commands timeout. `CommunicationService.RefreshSiteAddresses()` triggers an on-demand refresh after an address is added. ### Commands are timing out but the site is reachable A single malformed address string for one site can silently prevent `ClusterClient` creation for that site while other sites are unaffected. Check the logs for a `Warning` line from `HandleSiteAddressCacheLoaded` naming the offending site. The actor parse-guard catches the `ActorPath.Parse` exception per-site so the rest of the refresh proceeds. A `Warning` at the `Status.Failure` handler in `CentralCommunicationActor` means `LoadSiteAddressesFromDb` itself threw (typically a SQL connection error); the cache is left stale until the next successful refresh. ### gRPC debug stream drops immediately after opening `SiteStreamGrpcServer` rejects `correlation_id` values that contain characters invalid in Akka actor names (`/`, whitespace, etc.) with `StatusCode.InvalidArgument`. Verify that the calling `DebugStreamBridgeActor` generates a safe correlation ID. After a site node failover, the `DebugStreamBridgeActor` attempts to reconnect to the other node endpoint (`_useNodeA` flips on each error). If both nodes are unreachable, the actor exhausts its 3-retry budget and calls `onTerminated`. The engineer must restart the debug session. ### Heartbeats arrive but health reports do not `SiteCommunicationActor` sends heartbeats and health reports via separate paths. Health reports are sent only when the site's `HealthReportSender` publishes them (every 30 s by default). If heartbeats arrive but reports do not, the health-report sender on the site may have faulted — check site-side logs for errors in `HealthReportSender`. ## Related Documentation - [Central–Site Communication design specification](../requirements/Component-Communication.md) - [Commons](./Commons.md) - [Cluster Infrastructure](./ClusterInfrastructure.md) - [Configuration Database](./ConfigurationDatabase.md) - [Deployment Manager](./DeploymentManager.md) - [Site Runtime](./SiteRuntime.md) - [Health Monitoring](./HealthMonitoring.md) - [Audit Log](./AuditLog.md) - [Notification Outbox](./NotificationOutbox.md) - [Site Call Audit](./SiteCallAudit.md) - [Store-and-Forward Engine](./StoreAndForward.md) - [Management Service](./ManagementService.md)