docs(components): reference docs batch 1/4 — Commons, ConfigurationDatabase, Communication, ClusterInfrastructure, Host, Security

This commit is contained in:
Joseph Doherty
2026-06-03 15:42:03 -04:00
parent b2770764c5
commit b89611464b
6 changed files with 1628 additions and 0 deletions
+277
View File
@@ -0,0 +1,277 @@
# CentralSite Communication
The CentralSite Communication component is the transport layer that connects the central cluster to every site cluster. It provides two independent transports — Akka.NET `ClusterClient` for command/control and gRPC server-streaming for real-time data — wired together through a pair of actors that each cluster registers with the `ClusterClientReceptionist`.
## Overview
Communication (#5) runs on every node in every cluster. The component code lives in `src/ZB.MOM.WW.ScadaBridge.Communication/`, organised as follows:
- `Actors/``CentralCommunicationActor`, `SiteCommunicationActor`, `DebugStreamBridgeActor`, `StreamRelayActor`.
- `Grpc/``SiteStreamGrpcServer`, `SiteStreamGrpcClient`, `SiteStreamGrpcClientFactory`, `ISiteStreamSubscriber`, and the proto DTO mappers.
- `Protos/``sitestream.proto` (proto source; generated C# is vendored in `SiteStreamGrpc/`).
- `CommunicationService.cs` — typed Ask-pattern façade used by callers on the central side.
- `DebugStreamService.cs` — session manager for debug stream bridge actors.
- `CommunicationOptions.cs` — configuration options class.
- `ServiceCollectionExtensions.cs` — DI registration (`AddCommunication`).
DI registration is called from the Host composition root via `AddCommunication`. The actors themselves are created inside `AkkaHostedService.RegisterCentralActors` / `RegisterSiteActors` because they must be created within the actor system, not by the DI container.
## Key Concepts
### Two transports, two concerns
| Transport | Direction | Purpose |
|-----------|-----------|---------|
| Akka.NET `ClusterClient` | bidirectional (command/control) | Deployments, lifecycle, subscribe/unsubscribe handshake, snapshots, heartbeats, health reports, telemetry, notifications |
| gRPC server-streaming (`SiteStreamService`) | site → central | Real-time attribute value and alarm state changes |
The transports are independent. A gRPC stream interruption does not affect in-flight `ClusterClient` commands, and vice versa.
### Hub-and-spoke topology
Sites do not communicate with each other. All inter-cluster traffic flows through central. Central maintains one `ClusterClient` per site; each site maintains a single `ClusterClient` pointed at both central nodes.
### `SiteEnvelope` routing
Central-side callers wrap outbound messages in a `SiteEnvelope(SiteId, Message)`. `CentralCommunicationActor` resolves the site's `ClusterClient` by `SiteId` and forwards the inner message to `/user/site-communication` on the site:
```csharp
// CommunicationService.cs — deployment pattern
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<DeploymentStatusResponse>(
envelope, _options.DeploymentTimeout, cancellationToken);
}
```
`CentralCommunicationActor.HandleSiteEnvelope` extracts the inner message and routes it via the cached `ClusterClient`:
```csharp
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
return; // caller's Ask times out — no central buffering
}
entry.Client.Tell(
new ClusterClient.Send("/user/site-communication", envelope.Message),
Sender);
}
```
### No central buffering
If a site is unreachable when a command arrives, the caller's Ask times out. Central never queues command/control messages on behalf of a site. This is deliberate: it keeps the central coordinator stateless with respect to site availability and pushes retry responsibility to the operator or to the Store-and-Forward Engine for messages that tolerate it.
## Architecture
### Central-side: `CentralCommunicationActor`
`CentralCommunicationActor` is a `ReceiveActor` created at `/user/central-communication` and registered with `ClusterClientReceptionist` so the site's `ClusterClient` can locate it. It owns:
- A `Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)>` keyed by site identifier — one `ClusterClient` per site.
- A `RefreshSiteAddresses` periodic timer (60-second cadence, starting immediately). Each tick fires `LoadSiteAddressesFromDb`, which reads every `Site` row from the database, parses `NodeAAddress` and `NodeBAddress` into Akka receptionist paths (`{addr}/system/receptionist`), and pipes a `SiteAddressCacheLoaded` message back to Self. `HandleSiteAddressCacheLoaded` creates, updates, or stops `ClusterClient` actors based on the diff.
- Proxy references to `NotificationOutboxActor` and `AuditLogIngestActor` cluster singletons, injected post-construction via `RegisterNotificationOutbox` / `RegisterAuditIngest` messages from the Host. Messages that arrive before the proxy is registered are answered with a non-accepted ack (notifications) or an empty reply (audit), so the site retries without data loss.
- Fanout of `SiteHealthReport` to the peer central node via `DistributedPubSub`, keyed on the `site-health-replica` topic, so both central nodes' aggregators stay in sync regardless of which central node the site's `ClusterClient` load-balanced the report to.
`ISiteClientFactory` / `DefaultSiteClientFactory` abstract `ClusterClient` construction for testability.
### Site-side: `SiteCommunicationActor`
`SiteCommunicationActor` is a `ReceiveActor` created at `/user/site-communication` and registered with `ClusterClientReceptionist`. It owns:
- An `IActorRef? _centralClient` — the site's outbound `ClusterClient` to central. Injected post-construction via `RegisterCentralClient`.
- A `Timers`-based heartbeat (default 5-second interval, first tick after 1 second). Each tick sends a `HeartbeatMessage` with `IsActive` stamped from the Akka `Cluster` leader check — the node is active when its `MemberStatus` is `Up` and it holds cluster leadership.
- Dispatch to local handlers for every inbound command pattern. Handlers for event-log, parked-message, integration, and artifact patterns are registered post-construction via `RegisterLocalHandler`; unregistered patterns receive an inline error reply so the central Ask does not stall.
Site-to-central messages (health reports, audit batches, notification submissions) are sent via:
```csharp
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", msg), Sender);
```
The original `Sender` is forwarded as the `ClusterClient.Send` sender so any reply from central routes straight back to the waiting Ask on the site, not through `SiteCommunicationActor`.
### Address loading and the 60-second refresh
`CentralCommunicationActor` calls `ISiteRepository.GetAllSitesAsync` inside a background `Task.Run` (to avoid blocking the actor thread on a database round-trip) and pipes the result as `SiteAddressCacheLoaded`. The actor-lifecycle `CancellationTokenSource` is threaded into the repository call so a slow MS SQL query is cancelled when the actor stops.
A malformed address for one site does not abort the refresh loop — the actor catches the parse failure, logs a warning, skips that site, and processes the rest. The refresh also runs immediately on startup (`TimeSpan.Zero` initial delay) so the cache is populated before the first command arrives.
`CommunicationService.RefreshSiteAddresses()` triggers an on-demand refresh when a site record is added, edited, or deleted from the Central UI or CLI.
### gRPC real-time data transport
Real-time attribute value and alarm state changes are delivered over `SiteStreamService`, a gRPC server-streaming service defined in `sitestream.proto`.
**Site-side**`SiteStreamGrpcServer` (Kestrel HTTP/2, port 8083):
- Implements `SiteStreamService.SiteStreamServiceBase`.
- For each `SubscribeInstance` call, creates a `StreamRelayActor` (named `stream-relay-{correlationId}-{seq}`) and subscribes it to `ISiteStreamSubscriber` (implemented by `SiteStreamManager` in the Site Runtime project — `SiteStreamGrpcServer` holds only the interface so it does not reference `SiteRuntime` directly).
- Bridges events via a `BoundedChannel<SiteStreamEvent>` (capacity 1000, `DropOldest`) from the actor thread to the async gRPC write loop.
- Enforces a `GrpcMaxConcurrentStreams` limit (default 100) and a `GrpcMaxStreamLifetime` session timeout (default 4 hours) to evict zombie streams.
- Validates `correlation_id` against `ActorPath.IsValidPathElement` before use in an actor name, rejecting invalid values with `StatusCode.InvalidArgument`.
- During `CoordinatedShutdown`, `CancelAllStreams()` flips `_shuttingDown`, refuses new subscriptions with `StatusCode.Unavailable`, and cancels all active `CancellationTokenSource`s.
`StreamRelayActor` is a lightweight `ReceiveActor` that converts `AttributeValueChanged` and `AlarmStateChanged` domain events to proto `SiteStreamEvent` messages and writes them to the channel writer:
```csharp
// StreamRelayActor.cs
private void HandleAttributeValueChanged(AttributeValueChanged msg)
{
var protoEvent = new SiteStreamEvent
{
CorrelationId = _correlationId,
AttributeChanged = new AttributeValueUpdate
{
InstanceUniqueName = msg.InstanceUniqueName,
AttributePath = msg.AttributePath,
AttributeName = msg.AttributeName,
Value = ValueFormatter.FormatDisplayValue(msg.Value),
Quality = MapQuality(msg.Quality),
Timestamp = Timestamp.FromDateTimeOffset(msg.Timestamp)
}
};
WriteToChannel(protoEvent);
}
```
**Central-side**`SiteStreamGrpcClient` / `SiteStreamGrpcClientFactory`:
- `SiteStreamGrpcClientFactory` (singleton) caches one `SiteStreamGrpcClient` per site identifier. On `GetOrCreate`, it compares the cached client's `Endpoint` to the requested endpoint and atomically replaces a stale client (different endpoint — NodeA→NodeB failover flip, or an edited address) with a fresh one.
- `SiteStreamGrpcClient` opens a `GrpcChannel` with HTTP/2 keepalive (`KeepAlivePingDelay` default 15 s, `KeepAlivePingTimeout` default 10 s, `KeepAlivePingPolicy.Always`). It calls `SubscribeInstance` and reads the response stream in a background `Task.Run`, invoking `onEvent` for each received event and `onError` on any non-cancellation exception.
### Debug stream session lifecycle
`DebugStreamService` manages one `DebugStreamBridgeActor` per active debug session. On `StartStreamAsync`, it resolves the instance's site and gRPC addresses, creates the bridge actor, and holds the session in a `ConcurrentDictionary`.
`DebugStreamBridgeActor` (one per session, short-lived, no persistence):
1. In `PreStart`, sends `SubscribeDebugViewRequest` to `CentralCommunicationActor` (ClusterClient, for the initial snapshot).
2. On receiving `DebugViewSnapshot`, fires `onEvent(snapshot)` and calls `OpenGrpcStream`.
3. `OpenGrpcStream` calls `_grpcFactory.GetOrCreate(siteId, endpoint)` and launches `client.SubscribeAsync(...)` as a background task. Domain events are marshalled back to the actor via `Self.Tell` for thread safety.
4. On a gRPC error, flips to the other node endpoint and retries (first retry immediate, subsequent retries with `ReconnectDelay` default 5 s). The retry budget (`MaxRetries = 3`) is recovered only after `StabilityWindow` (default 60 s) of uninterrupted connection — a stream that delivers one event then immediately fails does not count as stable.
5. On `StopDebugStream`, cancels the gRPC subscription and sends `UnsubscribeDebugViewRequest` to the site via ClusterClient.
### Proto definition summary
```proto
// Protos/sitestream.proto
service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse);
}
```
`SubscribeInstance` carries the real-time data stream. The other three RPCs (`IngestAuditEvents`, `IngestCachedTelemetry`, `PullAuditEvents`) serve the Audit Log component's gRPC telemetry push and reconciliation pull paths — `SiteStreamGrpcServer` hosts them on the same port because sites already listen there.
`SiteStreamEvent` uses a `oneof event { AttributeValueUpdate, AlarmStateUpdate }` discriminator. `AlarmStateUpdate` carries the full native alarm condition (fields 821) alongside the base computed-alarm fields (17), added additively so old clients ignoring unknown fields continue to work.
## Usage
Central callers interact through `CommunicationService`, which wraps each command pattern in a typed `Ask` with a per-pattern timeout:
| Pattern | Method | Timeout |
|---------|--------|---------|
| Instance deployment | `DeployInstanceAsync` | 120 s |
| Instance lifecycle | `DisableInstanceAsync`, `EnableInstanceAsync`, `DeleteInstanceAsync` | 30 s |
| Artifact deployment | `DeployArtifactsAsync` | 60 s |
| Integration routing | `RouteIntegrationCallAsync` | 30 s |
| Debug snapshot | `RequestDebugSnapshotAsync` | 30 s |
| Remote queries | `QueryEventLogsAsync`, `QueryParkedMessagesAsync`, etc. | 30 s |
| OPC UA tag browse | `BrowseNodeAsync` | 30 s |
| Notification outbox (central-local) | `QueryNotificationOutboxAsync`, `RetryNotificationAsync`, etc. | 30 s |
| Site Call Audit (central-local) | `QuerySiteCallsAsync`, `RetrySiteCallAsync`, etc. | 30 s |
Notification Outbox and Site Call Audit actors are central-local singletons — their `CommunicationService` methods Ask the proxy directly without wrapping in `SiteEnvelope`.
For real-time streaming, callers use `DebugStreamService.StartStreamAsync`, which creates a `DebugStreamBridgeActor` and returns a session handle. Ongoing events arrive via the `onEvent` callback; session teardown is via `StopStreamAsync`.
## Configuration
All options are bound from the `Communication` section via `CommunicationOptions`:
| Key | Default | Description |
|-----|---------|-------------|
| `DeploymentTimeout` | `00:02:00` | Ask timeout for instance deployment commands. |
| `LifecycleTimeout` | `00:00:30` | Ask timeout for lifecycle commands (disable, enable, delete). |
| `ArtifactDeploymentTimeout` | `00:01:00` | Ask timeout for system-wide artifact deployment. |
| `QueryTimeout` | `00:00:30` | Ask timeout for remote queries and management commands. |
| `IntegrationTimeout` | `00:00:30` | Ask timeout for integration routing and Inbound API routing. |
| `DebugViewTimeout` | `00:00:10` | Ask timeout for debug subscribe/unsubscribe handshake. |
| `NotificationForwardTimeout` | `00:00:30` | Ask timeout for notification submission forwarding. |
| `CentralContactPoints` | `[]` | Site-side: Akka addresses of central nodes, e.g. `akka.tcp://scadabridge@central-a:8081`. |
| `GrpcKeepAlivePingDelay` | `00:00:15` | HTTP/2 keepalive PING interval on `SiteStreamGrpcClient`. |
| `GrpcKeepAlivePingTimeout` | `00:00:10` | HTTP/2 keepalive PING timeout. |
| `GrpcMaxStreamLifetime` | `04:00:00` | Per-stream session timeout; forces reconnect of zombie streams. |
| `GrpcMaxConcurrentStreams` | `100` | Max concurrent `SubscribeInstance` streams per site node. |
| `TransportHeartbeatInterval` | `00:00:05` | `SiteCommunicationActor` heartbeat cadence. |
| `TransportFailureThreshold` | `00:00:15` | Akka remoting failure-detection threshold. |
Three layers of dead-client detection protect the gRPC stream path:
| Layer | Detects | Timeline |
|-------|---------|----------|
| TCP RST | Clean process death, connection close | ~15 s |
| gRPC keepalive PING | Network partition, silent crash | ~25 s |
| Session timeout (`GrpcMaxStreamLifetime`) | Zombie streams with misconfigured keepalive | 4 h |
## Dependencies & Interactions
- [Commons (#16)](./Commons.md) — owns all message contracts used by this component: `DeployInstanceCommand`, `SiteEnvelope`, `HeartbeatMessage`, `SiteHealthReport`, `SiteHealthReportReplica`, `RegisterNotificationOutbox`, `RegisterAuditIngest`, `IngestAuditEventsCommand`, `IngestCachedTelemetryCommand`, and all other request/response records. Commons does not hold an Akka package reference, so `RegisterAuditIngest` (which carries an `IActorRef`) lives in this project.
- [Cluster Infrastructure (#13)](./ClusterInfrastructure.md) — provides `ClusterClientReceptionist` registration and the active/standby leader model that `SiteCommunicationActor`'s `IsActive` check and `CentralCommunicationActor`'s `DistributedPubSub` fanout both depend on.
- [Configuration Database (#17)](./ConfigurationDatabase.md) — provides `ISiteRepository.GetAllSitesAsync` for address loading; site records carry `NodeAAddress`, `NodeBAddress`, `GrpcNodeAAddress`, `GrpcNodeBAddress`.
- [Deployment Manager (#2)](./DeploymentManager.md) — the primary consumer of command/control patterns 13. `CommunicationService` is injected into the Deployment Manager actor to send deployments, lifecycle commands, and artifact deployments to sites.
- [Site Runtime (#3)](./SiteRuntime.md) — `SiteCommunicationActor` forwards inbound commands to the `DeploymentManager` singleton proxy. `SiteStreamManager` (in Site Runtime) implements `ISiteStreamSubscriber` so `SiteStreamGrpcServer` can subscribe relay actors to instance event feeds without referencing Site Runtime directly.
- [Health Monitoring (#11)](./HealthMonitoring.md) — `CentralCommunicationActor` calls `ICentralHealthAggregator.MarkHeartbeat` and `ProcessReport` for every inbound heartbeat and health report. `DistributedPubSub` fanout keeps both central nodes' aggregators in sync.
- [Audit Log (#23)](./AuditLog.md) — `SiteStreamGrpcServer` hosts `IngestAuditEvents`, `IngestCachedTelemetry`, and `PullAuditEvents` RPCs. `CentralCommunicationActor` routes `IngestAuditEventsCommand` / `IngestCachedTelemetryCommand` ClusterClient messages to the `AuditLogIngestActor` proxy.
- [Notification Outbox (#21)](./NotificationOutbox.md) — `CentralCommunicationActor` routes `NotificationSubmit` / `NotificationStatusQuery` messages from sites to the `NotificationOutboxActor` proxy. `CommunicationService` Asks the proxy directly for central-UI outbox management calls.
- [Site Call Audit (#22)](./SiteCallAudit.md) — `CommunicationService` Asks the `SiteCallAuditActor` proxy directly for query and relay operations. `SiteCallAuditActor` issues `RetryParkedOperation` / `DiscardParkedOperation` relay commands to sites via `SiteEnvelope`; `SiteCommunicationActor` dispatches them to `_parkedMessageHandler`.
- [Store-and-Forward Engine (#6)](./StoreAndForward.md) — the site S&F Engine drives `NotificationSubmit` forwarding and cached-call telemetry emission through `SiteCommunicationActor`. Parked-message queries and retry/discard relay commands flow back the other way.
- [Management Service (#18)](./ManagementService.md) — `ManagementActor` is registered with `ClusterClientReceptionist` at `/user/management` on central; the CLI connects via its own separate `ClusterClient`. This is a distinct `ClusterClient` usage from the inter-cluster hub-and-spoke connections managed by this component.
- Design spec: [Component-Communication.md](../requirements/Component-Communication.md).
## Troubleshooting
### A site's commands fail immediately
Check that `NodeAAddress` and `NodeBAddress` are populated in the site configuration — if both are empty, `CentralCommunicationActor` logs a warning and skips that site on every refresh, so no `ClusterClient` is created and all commands timeout. `CommunicationService.RefreshSiteAddresses()` triggers an on-demand refresh after an address is added.
### Commands are timing out but the site is reachable
A single malformed address string for one site can silently prevent `ClusterClient` creation for that site while other sites are unaffected. Check the logs for a `Warning` line from `HandleSiteAddressCacheLoaded` naming the offending site. The actor parse-guard catches the `ActorPath.Parse` exception per-site so the rest of the refresh proceeds.
A `Warning` at the `Status.Failure` handler in `CentralCommunicationActor` means `LoadSiteAddressesFromDb` itself threw (typically a SQL connection error); the cache is left stale until the next successful refresh.
### gRPC debug stream drops immediately after opening
`SiteStreamGrpcServer` rejects `correlation_id` values that contain characters invalid in Akka actor names (`/`, whitespace, etc.) with `StatusCode.InvalidArgument`. Verify that the calling `DebugStreamBridgeActor` generates a safe correlation ID.
After a site node failover, the `DebugStreamBridgeActor` attempts to reconnect to the other node endpoint (`_useNodeA` flips on each error). If both nodes are unreachable, the actor exhausts its 3-retry budget and calls `onTerminated`. The engineer must restart the debug session.
### Heartbeats arrive but health reports do not
`SiteCommunicationActor` sends heartbeats and health reports via separate paths. Health reports are sent only when the site's `SiteHealthReportActor` publishes them (every 30 s by default). If heartbeats arrive but reports do not, the health reporting actor on the site may have faulted — check site-side logs for errors in `SiteHealthReportActor`.
## Related Documentation
- [CentralSite Communication design specification](../requirements/Component-Communication.md)
- [Commons](./Commons.md)
- [Cluster Infrastructure](./ClusterInfrastructure.md)
- [Configuration Database](./ConfigurationDatabase.md)
- [Deployment Manager](./DeploymentManager.md)
- [Site Runtime](./SiteRuntime.md)
- [Health Monitoring](./HealthMonitoring.md)
- [Audit Log](./AuditLog.md)
- [Notification Outbox](./NotificationOutbox.md)
- [Site Call Audit](./SiteCallAudit.md)
- [Store-and-Forward Engine](./StoreAndForward.md)
- [Management Service](./ManagementService.md)