Files
Joseph Doherty c5fb02d640 docs(components): accuracy fixes from deep review (batch 1)
Commons (third-party dep, 7 namespaces, retired ApiKey, repo SaveChanges
carve-out), ConfigurationDatabase (5 persisted + 1 non-persisted computed col),
ClusterInfrastructure (abbreviated HOCON note, RemotingPort default),
Host (component matrix: CI/HealthMonitoring/ExternalSystemGateway have no
actors; DeadLetterMonitorActor runs on both roles), Security (Bearer not
X-API-Key; ApiKeyAdmin registered by Host), Communication (Task.Run/Sender).
2026-06-03 16:32:01 -04:00

278 lines
22 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# CentralSite Communication
The CentralSite Communication component is the transport layer that connects the central cluster to every site cluster. It provides two independent transports — Akka.NET `ClusterClient` for command/control and gRPC server-streaming for real-time data — wired together through a pair of actors that each cluster registers with the `ClusterClientReceptionist`.
## Overview
Communication (#5) runs on every node in every cluster. The component code lives in `src/ZB.MOM.WW.ScadaBridge.Communication/`, organised as follows:
- `Actors/``CentralCommunicationActor`, `SiteCommunicationActor`, `DebugStreamBridgeActor`, `StreamRelayActor`.
- `Grpc/``SiteStreamGrpcServer`, `SiteStreamGrpcClient`, `SiteStreamGrpcClientFactory`, `ISiteStreamSubscriber`, and the proto DTO mappers.
- `Protos/``sitestream.proto` (proto source; generated C# is vendored in `SiteStreamGrpc/`).
- `CommunicationService.cs` — typed Ask-pattern façade used by callers on the central side.
- `DebugStreamService.cs` — session manager for debug stream bridge actors.
- `CommunicationOptions.cs` — configuration options class.
- `ServiceCollectionExtensions.cs` — DI registration (`AddCommunication`).
DI registration is called from the Host composition root via `AddCommunication`. The actors themselves are created inside `AkkaHostedService.RegisterCentralActors` / `RegisterSiteActors` because they must be created within the actor system, not by the DI container.
## Key Concepts
### Two transports, two concerns
| Transport | Direction | Purpose |
|-----------|-----------|---------|
| Akka.NET `ClusterClient` | bidirectional (command/control) | Deployments, lifecycle, subscribe/unsubscribe handshake, snapshots, heartbeats, health reports, telemetry, notifications |
| gRPC server-streaming (`SiteStreamService`) | site → central | Real-time attribute value and alarm state changes |
The transports are independent. A gRPC stream interruption does not affect in-flight `ClusterClient` commands, and vice versa.
### Hub-and-spoke topology
Sites do not communicate with each other. All inter-cluster traffic flows through central. Central maintains one `ClusterClient` per site; each site maintains a single `ClusterClient` pointed at both central nodes.
### `SiteEnvelope` routing
Central-side callers wrap outbound messages in a `SiteEnvelope(SiteId, Message)`. `CentralCommunicationActor` resolves the site's `ClusterClient` by `SiteId` and forwards the inner message to `/user/site-communication` on the site:
```csharp
// CommunicationService.cs — deployment pattern
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<DeploymentStatusResponse>(
envelope, _options.DeploymentTimeout, cancellationToken);
}
```
`CentralCommunicationActor.HandleSiteEnvelope` extracts the inner message and routes it via the cached `ClusterClient`:
```csharp
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
return; // caller's Ask times out — no central buffering
}
entry.Client.Tell(
new ClusterClient.Send("/user/site-communication", envelope.Message),
Sender);
}
```
### No central buffering
If a site is unreachable when a command arrives, the caller's Ask times out. Central never queues command/control messages on behalf of a site. This is deliberate: it keeps the central coordinator stateless with respect to site availability and pushes retry responsibility to the operator or to the Store-and-Forward Engine for messages that tolerate it.
## Architecture
### Central-side: `CentralCommunicationActor`
`CentralCommunicationActor` is a `ReceiveActor` created at `/user/central-communication` and registered with `ClusterClientReceptionist` so the site's `ClusterClient` can locate it. It owns:
- A `Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)>` keyed by site identifier — one `ClusterClient` per site.
- A `RefreshSiteAddresses` periodic timer (60-second cadence, starting immediately). Each tick fires `LoadSiteAddressesFromDb`, which reads every `Site` row from the database, parses `NodeAAddress` and `NodeBAddress` into Akka receptionist paths (`{addr}/system/receptionist`), and pipes a `SiteAddressCacheLoaded` message back to Self. `HandleSiteAddressCacheLoaded` creates, updates, or stops `ClusterClient` actors based on the diff.
- Proxy references to `NotificationOutboxActor` and `AuditLogIngestActor` cluster singletons, injected post-construction via `RegisterNotificationOutbox` / `RegisterAuditIngest` messages from the Host. Messages that arrive before the proxy is registered are answered with a non-accepted ack (notifications) or an empty reply (audit), so the site retries without data loss.
- Fanout of `SiteHealthReport` to the peer central node via `DistributedPubSub`, keyed on the `site-health-replica` topic, so both central nodes' aggregators stay in sync regardless of which central node the site's `ClusterClient` load-balanced the report to.
`ISiteClientFactory` / `DefaultSiteClientFactory` abstract `ClusterClient` construction for testability.
### Site-side: `SiteCommunicationActor`
`SiteCommunicationActor` is a `ReceiveActor` created at `/user/site-communication` and registered with `ClusterClientReceptionist`. It owns:
- An `IActorRef? _centralClient` — the site's outbound `ClusterClient` to central. Injected post-construction via `RegisterCentralClient`.
- A `Timers`-based heartbeat (default 5-second interval, first tick after 1 second). Each tick sends a `HeartbeatMessage` with `IsActive` stamped from the Akka `Cluster` leader check — the node is active when its `MemberStatus` is `Up` and it holds cluster leadership.
- Dispatch to local handlers for every inbound command pattern. Handlers for event-log, parked-message, integration, and artifact patterns are registered post-construction via `RegisterLocalHandler`; unregistered patterns receive an inline error reply so the central Ask does not stall.
Site-to-central messages (health reports, audit batches, notification submissions) are sent via:
```csharp
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
```
For request/response messages, the original `Sender` is forwarded as the `ClusterClient.Send` sender so any reply from central routes straight back to the waiting Ask on the site, not through `SiteCommunicationActor`. For fire-and-forget messages (e.g. `SiteHealthReport`), `Self` is used as the sender instead, because no reply is expected.
### Address loading and the 60-second refresh
`CentralCommunicationActor` calls `ISiteRepository.GetAllSitesAsync` inside a background `Task.Run` (to avoid blocking the actor thread on a database round-trip) and pipes the result as `SiteAddressCacheLoaded`. The actor-lifecycle `CancellationTokenSource` is threaded into the repository call so a slow MS SQL query is cancelled when the actor stops.
A malformed address for one site does not abort the refresh loop — the actor catches the parse failure, logs a warning, skips that site, and processes the rest. The refresh also runs immediately on startup (`TimeSpan.Zero` initial delay) so the cache is populated before the first command arrives.
`CommunicationService.RefreshSiteAddresses()` triggers an on-demand refresh when a site record is added, edited, or deleted from the Central UI or CLI.
### gRPC real-time data transport
Real-time attribute value and alarm state changes are delivered over `SiteStreamService`, a gRPC server-streaming service defined in `sitestream.proto`.
**Site-side**`SiteStreamGrpcServer` (Kestrel HTTP/2, port 8083):
- Implements `SiteStreamService.SiteStreamServiceBase`.
- For each `SubscribeInstance` call, creates a `StreamRelayActor` (named `stream-relay-{correlationId}-{seq}`) and subscribes it to `ISiteStreamSubscriber` (implemented by `SiteStreamManager` in the Site Runtime project — `SiteStreamGrpcServer` holds only the interface so it does not reference `SiteRuntime` directly).
- Bridges events via a `BoundedChannel<SiteStreamEvent>` (capacity 1000, `DropOldest`) from the actor thread to the async gRPC write loop.
- Enforces a `GrpcMaxConcurrentStreams` limit (default 100) and a `GrpcMaxStreamLifetime` session timeout (default 4 hours) to evict zombie streams.
- Validates `correlation_id` against `ActorPath.IsValidPathElement` before use in an actor name, rejecting invalid values with `StatusCode.InvalidArgument`.
- During `CoordinatedShutdown`, `CancelAllStreams()` flips `_shuttingDown`, refuses new subscriptions with `StatusCode.Unavailable`, and cancels all active `CancellationTokenSource`s.
`StreamRelayActor` is a lightweight `ReceiveActor` that converts `AttributeValueChanged` and `AlarmStateChanged` domain events to proto `SiteStreamEvent` messages and writes them to the channel writer:
```csharp
// StreamRelayActor.cs
private void HandleAttributeValueChanged(AttributeValueChanged msg)
{
var protoEvent = new SiteStreamEvent
{
CorrelationId = _correlationId,
AttributeChanged = new AttributeValueUpdate
{
InstanceUniqueName = msg.InstanceUniqueName,
AttributePath = msg.AttributePath,
AttributeName = msg.AttributeName,
Value = ValueFormatter.FormatDisplayValue(msg.Value),
Quality = MapQuality(msg.Quality),
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
}
};
WriteToChannel(protoEvent);
}
```
**Central-side**`SiteStreamGrpcClient` / `SiteStreamGrpcClientFactory`:
- `SiteStreamGrpcClientFactory` (singleton) caches one `SiteStreamGrpcClient` per site identifier. On `GetOrCreate`, it compares the cached client's `Endpoint` to the requested endpoint and atomically replaces a stale client (different endpoint — NodeA→NodeB failover flip, or an edited address) with a fresh one.
- `SiteStreamGrpcClient` opens a `GrpcChannel` with HTTP/2 keepalive (`KeepAlivePingDelay` default 15 s, `KeepAlivePingTimeout` default 10 s, `KeepAlivePingPolicy.Always`). `SubscribeAsync` is a plain `async Task` that calls `SubscribeInstance` and reads the response stream with `await foreach`, invoking `onEvent` for each received event and `onError` on any non-cancellation exception. The caller (`DebugStreamBridgeActor.OpenGrpcStream`) launches it inside a `Task.Run` so the long-running stream loop runs off the actor thread.
### Debug stream session lifecycle
`DebugStreamService` manages one `DebugStreamBridgeActor` per active debug session. On `StartStreamAsync`, it resolves the instance's site and gRPC addresses, creates the bridge actor, and holds the session in a `ConcurrentDictionary`.
`DebugStreamBridgeActor` (one per session, short-lived, no persistence):
1. In `PreStart`, sends `SubscribeDebugViewRequest` to `CentralCommunicationActor` (ClusterClient, for the initial snapshot).
2. On receiving `DebugViewSnapshot`, fires `onEvent(snapshot)` and calls `OpenGrpcStream`.
3. `OpenGrpcStream` calls `_grpcFactory.GetOrCreate(siteId, endpoint)` and launches `client.SubscribeAsync(...)` as a background task. Domain events are marshalled back to the actor via `Self.Tell` for thread safety.
4. On a gRPC error, flips to the other node endpoint and retries (first retry immediate, subsequent retries with `ReconnectDelay` default 5 s). The retry budget (`MaxRetries = 3`) is recovered only after `StabilityWindow` (default 60 s) of uninterrupted connection — a stream that delivers one event then immediately fails does not count as stable.
5. On `StopDebugStream`, cancels the gRPC subscription and sends `UnsubscribeDebugViewRequest` to the site via ClusterClient.
### Proto definition summary
```proto
// Protos/sitestream.proto
service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse);
}
```
`SubscribeInstance` carries the real-time data stream. The other three RPCs (`IngestAuditEvents`, `IngestCachedTelemetry`, `PullAuditEvents`) serve the Audit Log component's gRPC telemetry push and reconciliation pull paths — `SiteStreamGrpcServer` hosts them on the same port because sites already listen there.
`SiteStreamEvent` uses a `oneof event { AttributeValueUpdate, AlarmStateUpdate }` discriminator. `AlarmStateUpdate` carries the full native alarm condition (fields 821) alongside the base computed-alarm fields (17), added additively so old clients ignoring unknown fields continue to work.
## Usage
Central callers interact through `CommunicationService`, which wraps each command pattern in a typed `Ask` with a per-pattern timeout:
| Pattern | Method | Timeout |
|---------|--------|---------|
| Instance deployment | `DeployInstanceAsync` | 120 s |
| Instance lifecycle | `DisableInstanceAsync`, `EnableInstanceAsync`, `DeleteInstanceAsync` | 30 s |
| Artifact deployment | `DeployArtifactsAsync` | 60 s |
| Integration routing | `RouteIntegrationCallAsync` | 30 s |
| Debug snapshot | `RequestDebugSnapshotAsync` | 30 s |
| Remote queries | `QueryEventLogsAsync`, `QueryParkedMessagesAsync`, etc. | 30 s |
| OPC UA tag browse | `BrowseNodeAsync` | 30 s |
| Notification outbox (central-local) | `QueryNotificationOutboxAsync`, `RetryNotificationAsync`, etc. | 30 s |
| Site Call Audit (central-local) | `QuerySiteCallsAsync`, `RetrySiteCallAsync`, etc. | 30 s |
Notification Outbox and Site Call Audit actors are central-local singletons — their `CommunicationService` methods Ask the proxy directly without wrapping in `SiteEnvelope`.
For real-time streaming, callers use `DebugStreamService.StartStreamAsync`, which creates a `DebugStreamBridgeActor` and returns a session handle. Ongoing events arrive via the `onEvent` callback; session teardown is via `StopStreamAsync`.
## Configuration
All options are bound from the `Communication` section via `CommunicationOptions`:
| Key | Default | Description |
|-----|---------|-------------|
| `DeploymentTimeout` | `00:02:00` | Ask timeout for instance deployment commands. |
| `LifecycleTimeout` | `00:00:30` | Ask timeout for lifecycle commands (disable, enable, delete). |
| `ArtifactDeploymentTimeout` | `00:01:00` | Ask timeout for system-wide artifact deployment. |
| `QueryTimeout` | `00:00:30` | Ask timeout for remote queries and management commands. |
| `IntegrationTimeout` | `00:00:30` | Ask timeout for integration routing and Inbound API routing. |
| `DebugViewTimeout` | `00:00:10` | Ask timeout for debug subscribe/unsubscribe handshake. |
| `NotificationForwardTimeout` | `00:00:30` | Ask timeout for notification submission forwarding. |
| `CentralContactPoints` | `[]` | Site-side: Akka addresses of central nodes, e.g. `akka.tcp://scadabridge@central-a:8081`. |
| `GrpcKeepAlivePingDelay` | `00:00:15` | HTTP/2 keepalive PING interval on `SiteStreamGrpcClient`. |
| `GrpcKeepAlivePingTimeout` | `00:00:10` | HTTP/2 keepalive PING timeout. |
| `GrpcMaxStreamLifetime` | `04:00:00` | Per-stream session timeout; forces reconnect of zombie streams. |
| `GrpcMaxConcurrentStreams` | `100` | Max concurrent `SubscribeInstance` streams per site node. |
| `TransportHeartbeatInterval` | `00:00:05` | `SiteCommunicationActor` heartbeat cadence. |
| `TransportFailureThreshold` | `00:00:15` | Akka remoting failure-detection threshold. |
Three layers of dead-client detection protect the gRPC stream path:
| Layer | Detects | Timeline |
|-------|---------|----------|
| TCP RST | Clean process death, connection close | ~15 s |
| gRPC keepalive PING | Network partition, silent crash | ~25 s |
| Session timeout (`GrpcMaxStreamLifetime`) | Zombie streams with misconfigured keepalive | 4 h |
## Dependencies & Interactions
- [Commons (#16)](./Commons.md) — owns all message contracts used by this component: `DeployInstanceCommand`, `SiteEnvelope`, `HeartbeatMessage`, `SiteHealthReport`, `SiteHealthReportReplica`, `RegisterNotificationOutbox`, `RegisterAuditIngest`, `IngestAuditEventsCommand`, `IngestCachedTelemetryCommand`, and all other request/response records. Commons does not hold an Akka package reference, so `RegisterAuditIngest` (which carries an `IActorRef`) lives in this project.
- [Cluster Infrastructure (#13)](./ClusterInfrastructure.md) — provides `ClusterClientReceptionist` registration and the active/standby leader model that `SiteCommunicationActor`'s `IsActive` check and `CentralCommunicationActor`'s `DistributedPubSub` fanout both depend on.
- [Configuration Database (#17)](./ConfigurationDatabase.md) — provides `ISiteRepository.GetAllSitesAsync` for address loading; site records carry `NodeAAddress`, `NodeBAddress`, `GrpcNodeAAddress`, `GrpcNodeBAddress`.
- [Deployment Manager (#2)](./DeploymentManager.md) — the primary consumer of command/control patterns 13. `CommunicationService` is injected into the Deployment Manager actor to send deployments, lifecycle commands, and artifact deployments to sites.
- [Site Runtime (#3)](./SiteRuntime.md) — `SiteCommunicationActor` forwards inbound commands to the `DeploymentManager` singleton proxy. `SiteStreamManager` (in Site Runtime) implements `ISiteStreamSubscriber` so `SiteStreamGrpcServer` can subscribe relay actors to instance event feeds without referencing Site Runtime directly.
- [Health Monitoring (#11)](./HealthMonitoring.md) — `CentralCommunicationActor` calls `ICentralHealthAggregator.MarkHeartbeat` and `ProcessReport` for every inbound heartbeat and health report. `DistributedPubSub` fanout keeps both central nodes' aggregators in sync.
- [Audit Log (#23)](./AuditLog.md) — `SiteStreamGrpcServer` hosts `IngestAuditEvents`, `IngestCachedTelemetry`, and `PullAuditEvents` RPCs. `CentralCommunicationActor` routes `IngestAuditEventsCommand` / `IngestCachedTelemetryCommand` ClusterClient messages to the `AuditLogIngestActor` proxy.
- [Notification Outbox (#21)](./NotificationOutbox.md) — `CentralCommunicationActor` routes `NotificationSubmit` / `NotificationStatusQuery` messages from sites to the `NotificationOutboxActor` proxy. `CommunicationService` Asks the proxy directly for central-UI outbox management calls.
- [Site Call Audit (#22)](./SiteCallAudit.md) — `CommunicationService` Asks the `SiteCallAuditActor` proxy directly for query and relay operations. `SiteCallAuditActor` issues `RetryParkedOperation` / `DiscardParkedOperation` relay commands to sites via `SiteEnvelope`; `SiteCommunicationActor` dispatches them to `_parkedMessageHandler`.
- [Store-and-Forward Engine (#6)](./StoreAndForward.md) — the site S&F Engine drives `NotificationSubmit` forwarding and cached-call telemetry emission through `SiteCommunicationActor`. Parked-message queries and retry/discard relay commands flow back the other way.
- [Management Service (#18)](./ManagementService.md) — `ManagementActor` is registered with `ClusterClientReceptionist` at `/user/management` on central; the CLI connects via its own separate `ClusterClient`. This is a distinct `ClusterClient` usage from the inter-cluster hub-and-spoke connections managed by this component.
- Design spec: [Component-Communication.md](../requirements/Component-Communication.md).
## Troubleshooting
### A site's commands fail immediately
Check that `NodeAAddress` and `NodeBAddress` are populated in the site configuration — if both are empty, `CentralCommunicationActor` logs a warning and skips that site on every refresh, so no `ClusterClient` is created and all commands timeout. `CommunicationService.RefreshSiteAddresses()` triggers an on-demand refresh after an address is added.
### Commands are timing out but the site is reachable
A single malformed address string for one site can silently prevent `ClusterClient` creation for that site while other sites are unaffected. Check the logs for a `Warning` line from `HandleSiteAddressCacheLoaded` naming the offending site. The actor parse-guard catches the `ActorPath.Parse` exception per-site so the rest of the refresh proceeds.
A `Warning` at the `Status.Failure` handler in `CentralCommunicationActor` means `LoadSiteAddressesFromDb` itself threw (typically a SQL connection error); the cache is left stale until the next successful refresh.
### gRPC debug stream drops immediately after opening
`SiteStreamGrpcServer` rejects `correlation_id` values that contain characters invalid in Akka actor names (`/`, whitespace, etc.) with `StatusCode.InvalidArgument`. Verify that the calling `DebugStreamBridgeActor` generates a safe correlation ID.
After a site node failover, the `DebugStreamBridgeActor` attempts to reconnect to the other node endpoint (`_useNodeA` flips on each error). If both nodes are unreachable, the actor exhausts its 3-retry budget and calls `onTerminated`. The engineer must restart the debug session.
### Heartbeats arrive but health reports do not
`SiteCommunicationActor` sends heartbeats and health reports via separate paths. Health reports are sent only when the site's `HealthReportSender` publishes them (every 30 s by default). If heartbeats arrive but reports do not, the health-report sender on the site may have faulted — check site-side logs for errors in `HealthReportSender`.
## Related Documentation
- [CentralSite Communication design specification](../requirements/Component-Communication.md)
- [Commons](./Commons.md)
- [Cluster Infrastructure](./ClusterInfrastructure.md)
- [Configuration Database](./ConfigurationDatabase.md)
- [Deployment Manager](./DeploymentManager.md)
- [Site Runtime](./SiteRuntime.md)
- [Health Monitoring](./HealthMonitoring.md)
- [Audit Log](./AuditLog.md)
- [Notification Outbox](./NotificationOutbox.md)
- [Site Call Audit](./SiteCallAudit.md)
- [Store-and-Forward Engine](./StoreAndForward.md)
- [Management Service](./ManagementService.md)