docs: add gRPC streaming channel design plan for site→central real-time data

Replaces ClusterClient-based event streaming with dedicated gRPC server-streaming
channels. Covers proto definition, server/client patterns, Channel<T> bridging,
keepalive/orphan prevention, failover scenarios, port/address configuration,
extensibility guide for new event types, testing strategy, and implementation guardrails.
This commit is contained in:
Joseph Doherty
2026-03-21 11:26:09 -04:00
parent fd2e96fea2
commit 41aff339b2

976
docs/plans/grpc_streams.md Normal file
View File

@@ -0,0 +1,976 @@
# gRPC Streaming Channel: Site → Central Real-Time Data
## Context
Debug streaming events currently flow through Akka.NET ClusterClient (`InstanceActor → SiteCommunicationActor → ClusterClient.Send → CentralCommunicationActor → bridge actor`). ClusterClient wasn't built for high-throughput value streaming — it's a cluster coordination tool with gossip-based routing. As we scale beyond debug view to health streaming, alarm feeds, or future live dashboards, pushing all real-time data through ClusterClient will become a bottleneck.
**Goal**: Add a dedicated gRPC server-streaming channel on each site node. Central subscribes to sites over gRPC for real-time data. ClusterClient continues to handle command/control (subscribe, unsubscribe, deploy, lifecycle) but all streaming values flow through the gRPC channel.
**Scope**: General-purpose site→central streaming transport. Debug view is the first consumer, but the proto and server are designed so future features (health streaming, alarm feeds, live dashboards) can subscribe with different event types and filters.
## Why gRPC Streaming Instead of ClusterClient
| Concern | ClusterClient | gRPC Server Streaming |
|---------|---------------|----------------------|
| **Purpose** | Cluster coordination, service discovery, request/response | High-throughput data streaming |
| **Sender preservation** | Temporary proxy ref — breaks for stored future Tells | N/A — callback-based, no actor refs cross boundary |
| **Flow control** | None (fire-and-forget Tell) | HTTP/2 flow control + Channel backpressure |
| **Scalability** | Gossip-based routing, single receptionist | Direct TCP/HTTP2 per-site, multiplexed streams |
| **Reconnection** | ClusterClient auto-reconnect (coarse, cluster-level) | gRPC channel-level reconnect per subscription |
| **Serialization** | Akka.NET Hyperion (runtime IL, fragile across versions) | Protocol Buffers (schema-driven, cross-platform) |
The DCL already uses this exact pattern — `RealLmxProxyClient` opens gRPC server-streaming subscriptions to LmxProxy servers for real-time tag value updates. This plan applies the same pattern to site→central communication.
## Architecture
```
Central Cluster Site Cluster
───────────── ────────────
DebugStreamBridgeActor InstanceActor
│ │
│── SubscribeDebugView ──► │ (ClusterClient: command/control)
│◄── DebugViewSnapshot ── │
│ │
│ │ publishes AttributeValueChanged
│ │ publishes AlarmStateChanged
│ ▼
SiteStreamGrpcClient ◄──── gRPC stream ───── SiteStreamGrpcServer
(per-site, on central) (HTTP/2) (Kestrel, on site)
│ │
│ reads from gRPC stream │ receives from SiteStreamManager
│ routes by correlationId │ filters by instance name
▼ │
DebugStreamBridgeActor │
│ │
▼ │
SignalR Hub / Blazor UI │
```
**Key separation**: ClusterClient handles subscribe/unsubscribe/snapshot (request-response). gRPC handles the ongoing value stream (server-streaming).
## Port & Address Configuration
### Site-Side (appsettings)
`ScadaLink:Node:GrpcPort` — explicit config setting, not derived from `RemotingPort`:
```json
"Node": {
"Role": "Site",
"NodeHostname": "scadalink-site-a-a",
"RemotingPort": 8082,
"GrpcPort": 8083
}
```
**Why explicit, not offset**: `RemotingPort` is itself a config value (8081 central, 8082 sites). A rigid offset silently breaks if someone changes `RemotingPort` to a non-standard value. Explicit ports are visible and independently configurable.
Add `GrpcPort` to `NodeOptions` (`src/ScadaLink.Host/NodeOptions.cs`):
```csharp
public int GrpcPort { get; set; } = 8083;
```
Add validation in `StartupValidator` (site role only — central doesn't host a gRPC streaming server).
### Central-Side (Database — Site Entity)
Central needs to know each site node's gRPC endpoint. Add two fields to the `Site` entity:
**Modify**: `src/ScadaLink.Commons/Entities/Sites/Site.cs`
```csharp
public class Site
{
public int Id { get; set; }
public string Name { get; set; }
public string SiteIdentifier { get; set; }
public string? Description { get; set; }
public string? NodeAAddress { get; set; } // Akka: "akka.tcp://scadalink@host:8082"
public string? NodeBAddress { get; set; } // Akka: "akka.tcp://scadalink@host:8082"
public string? GrpcNodeAAddress { get; set; } // gRPC: "http://host:8083"
public string? GrpcNodeBAddress { get; set; } // gRPC: "http://host:8083"
}
```
### Database Migration
Add `GrpcNodeAAddress` and `GrpcNodeBAddress` nullable string columns to the `Sites` table. Existing sites get `NULL` (gRPC streaming unavailable until configured).
### Management Commands
**Modify**: `src/ScadaLink.Commons/Messages/Management/SiteCommands.cs`
```csharp
public record CreateSiteCommand(
string Name, string SiteIdentifier, string? Description,
string? NodeAAddress = null, string? NodeBAddress = null,
string? GrpcNodeAAddress = null, string? GrpcNodeBAddress = null);
public record UpdateSiteCommand(
int SiteId, string Name, string? Description,
string? NodeAAddress = null, string? NodeBAddress = null,
string? GrpcNodeAAddress = null, string? GrpcNodeBAddress = null);
```
### ManagementActor Handlers
**Modify**: `src/ScadaLink.ManagementService/ManagementActor.cs`
Update `HandleCreateSite` and `HandleUpdateSite` to pass gRPC addresses through to the repository.
### CLI
**Modify**: `src/ScadaLink.CLI/Commands/SiteCommands.cs`
Add `--grpc-node-a-address` and `--grpc-node-b-address` options to `site create` and `site update`:
```sh
scadalink site create --name "Site A" --identifier site-a \
--node-a-address "akka.tcp://scadalink@site-a-a:8082" \
--node-b-address "akka.tcp://scadalink@site-a-b:8082" \
--grpc-node-a-address "http://site-a-a:8083" \
--grpc-node-b-address "http://site-a-b:8083"
```
### Central UI
**Modify**: `src/ScadaLink.CentralUI/Components/Pages/Admin/Sites.razor`
Add two form fields below the existing Node A / Node B address inputs in the site create/edit form:
```html
<label class="form-label small">gRPC Node A Address</label>
<input type="text" class="form-control form-control-sm" @bind="_formGrpcNodeAAddress"
placeholder="http://host:8083" />
<label class="form-label small">gRPC Node B Address</label>
<input type="text" class="form-control form-control-sm" @bind="_formGrpcNodeBAddress"
placeholder="http://host:8083" />
```
Add corresponding columns to the sites list table. Wire `_formGrpcNodeAAddress` / `_formGrpcNodeBAddress` into `CreateSiteCommand` / `UpdateSiteCommand` in the save handler.
### SiteStreamGrpcClientFactory
Reads `GrpcNodeAAddress` / `GrpcNodeBAddress` from the `Site` entity (loaded by `CentralCommunicationActor.LoadSiteAddressesFromDb()`) when creating per-site gRPC channels. Falls back to NodeB if NodeA connection fails (same pattern as ClusterClient dual-contact-point failover).
### Docker Compose Port Allocation
**Modify**: `docker/docker-compose.yml`
Expose gRPC ports for each site node (internal 8083):
- Site-A: `9023:8083` / `9024:8083` (nodes A/B)
- Site-B: `9033:8083` / `9034:8083`
- Site-C: `9043:8083` / `9044:8083`
### Files Affected by Port & Address Configuration
| File | Change |
|------|--------|
| `src/ScadaLink.Host/NodeOptions.cs` | Add `GrpcPort` property |
| `src/ScadaLink.Host/StartupValidator.cs` | Validate `GrpcPort` for site role |
| `src/ScadaLink.Host/appsettings.Site.json` | Add `GrpcPort: 8083` |
| `src/ScadaLink.Commons/Entities/Sites/Site.cs` | Add `GrpcNodeAAddress`, `GrpcNodeBAddress` |
| `src/ScadaLink.Commons/Messages/Management/SiteCommands.cs` | Add gRPC address params |
| `src/ScadaLink.ConfigurationDatabase/` | EF migration for new columns |
| `src/ScadaLink.ManagementService/ManagementActor.cs` | Pass gRPC addresses in handlers |
| `src/ScadaLink.CLI/Commands/SiteCommands.cs` | Add `--grpc-node-a-address` / `--grpc-node-b-address` |
| `src/ScadaLink.CentralUI/Components/Pages/Admin/Sites.razor` | Add gRPC address form fields + table columns |
| `docker/docker-compose.yml` | Expose gRPC ports |
## Proto Definition
**File**: `src/ScadaLink.Communication/Protos/sitestream.proto`
The `oneof event` pattern is extensible — future event types (health metrics, connection state changes, etc.) are added as new fields without breaking existing consumers.
```protobuf
syntax = "proto3";
option csharp_namespace = "ScadaLink.Communication.Grpc";
package sitestream;
service SiteStreamService {
// Subscribe to real-time events filtered by instance.
// Server streams events until the client cancels or the site shuts down.
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
}
message InstanceStreamRequest {
string correlation_id = 1;
string instance_unique_name = 2;
}
message SiteStreamEvent {
string correlation_id = 1;
oneof event {
AttributeValueUpdate attribute_changed = 2;
AlarmStateUpdate alarm_changed = 3;
// Future: HealthMetricUpdate health_metric = 4;
// Future: ConnectionStateUpdate connection_state = 5;
}
}
message AttributeValueUpdate {
string instance_unique_name = 1;
string attribute_path = 2;
string attribute_name = 3;
string value = 4; // string-encoded (same as LmxProxy VtqMessage pattern)
string quality = 5; // "Good", "Uncertain", "Bad"
int64 timestamp_utc_ticks = 6;
}
message AlarmStateUpdate {
string instance_unique_name = 1;
string alarm_name = 2;
int32 state = 3; // 0=Normal, 1=Active (maps to AlarmState enum)
int32 priority = 4;
int64 timestamp_utc_ticks = 5;
}
```
Pre-generate C# stubs and check into `src/ScadaLink.Communication/SiteStreamGrpc/` (same pattern as LmxProxy — no `protoc` in Docker for ARM64 compatibility).
## Server-Streaming Pattern (Site Side)
### gRPC Server Implementation
`SiteStreamGrpcServer` inherits from `SiteStreamService.SiteStreamServiceBase`:
```csharp
public override async Task SubscribeInstance(
InstanceStreamRequest request,
IServerStreamWriter<SiteStreamEvent> responseStream,
ServerCallContext context)
{
var channel = Channel.CreateBounded<SiteStreamEvent>(
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest });
// Local actor subscribes to SiteStreamManager, writes to channel
var relayActor = actorSystem.ActorOf(
Props.Create(() => new StreamRelayActor(request, channel.Writer)));
streamManager.Subscribe(request.InstanceUniqueName, relayActor);
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(context.CancellationToken))
{
await responseStream.WriteAsync(evt, context.CancellationToken);
}
}
finally
{
streamManager.RemoveSubscriber(relayActor);
actorSystem.Stop(relayActor);
}
}
```
### Channel\<T\> Bridging Pattern
`IServerStreamWriter<T>` is **not thread-safe**. Multiple Akka actors may publish events concurrently. The `Channel<SiteStreamEvent>` bridges these worlds:
```
Akka Actor Thread(s) gRPC Response Stream
│ ▲
│ channel.Writer.TryWrite(evt) │ await responseStream.WriteAsync(evt)
▼ │
┌─────────────────────────────────────────┐
│ Channel<SiteStreamEvent> │
│ BoundedChannelOptions(1000) │
│ FullMode = DropOldest │
└─────────────────────────────────────────┘
```
- **Bounded capacity** (1000): prevents unbounded memory growth if the gRPC client is slow
- **DropOldest**: matches the existing `SiteStreamManager` overflow strategy
- **ReadAllAsync**: yields items as they arrive, naturally async
### Kestrel HTTP/2 Setup
Site hosts switch from `Host.CreateDefaultBuilder()` to `WebApplicationBuilder` with Kestrel configured for a dedicated gRPC port:
```csharp
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenAnyIP(grpcPort, listenOptions =>
{
listenOptions.Protocols = HttpProtocols.Http2; // gRPC requires HTTP/2
});
});
builder.Services.AddGrpc();
// ... existing site services ...
app.MapGrpcService<SiteStreamGrpcServer>();
```
Reference: `infra/lmxfakeproxy/Program.cs` uses the identical Kestrel setup.
## Client-Streaming Pattern (Central Side)
### gRPC Client Implementation
`SiteStreamGrpcClient` manages per-site gRPC channels and streaming subscriptions:
```csharp
public async Task<StreamSubscription> SubscribeAsync(
string correlationId, string instanceUniqueName,
Action<object> onEvent, CancellationToken ct)
{
var request = new InstanceStreamRequest
{
CorrelationId = correlationId,
InstanceUniqueName = instanceUniqueName
};
var call = _client.SubscribeInstance(request, cancellationToken: ct);
// Background task reads from the gRPC response stream
_ = Task.Run(async () =>
{
try
{
await foreach (var evt in call.ResponseStream.ReadAllAsync(ct))
{
var domainEvent = ConvertToDomainEvent(evt);
onEvent(domainEvent);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Normal cancellation
}
}, ct);
return new StreamSubscription(correlationId, call);
}
```
Reference: `src/ScadaLink.DataConnectionLayer/Adapters/RealLmxProxyClient.cs` uses the identical background-task-reading-stream pattern for LmxProxy subscriptions.
### Port Resolution
### Client Factory
`SiteStreamGrpcClientFactory` caches per-site `GrpcChannel` instances (same pattern as `CentralCommunicationActor._siteClients` caching per-site ClusterClient instances).
## Failover & Reconnection
Four failure scenarios to handle, each with different behavior:
### 1. Site Node Failover (Active → Standby)
**What happens**: The active site node goes down. The site's Akka cluster promotes the standby to active. The Deployment Manager singleton moves to the new node, recreating Instance Actors from persisted config.
**gRPC impact**: The gRPC stream on the old node breaks — `ResponseStream.MoveNext()` throws `RpcException` on the central client.
**Central response** (`DebugStreamBridgeActor`):
1. gRPC stream breaks → `onStreamError` callback fires
2. Bridge actor receives the error, enters **reconnecting** state
3. Attempts to open a new gRPC stream to the site's **NodeB** address (via `SiteStreamGrpcClientFactory` failover)
4. If NodeB succeeds → stream resumes, new events flow. The consumer (SignalR/Blazor) sees a brief gap but no action needed.
5. If both nodes unreachable → `onTerminated` callback fires, session ends, consumer notified
**Data gap**: Events that occurred between the old node dying and the new stream connecting are lost. This is acceptable for debug view (real-time monitoring, not historical replay). If needed, the consumer can request a fresh snapshot via ClusterClient after reconnection to re-sync state.
**Reconnection timing**: The bridge actor should retry with backoff:
- Immediate retry to NodeB (site failover is fast, ~25s for Akka singleton handover)
- If NodeB fails, retry NodeA after 5s (original node may have restarted)
- Max 3 retries, then give up and terminate the session
```csharp
// In DebugStreamBridgeActor, on gRPC stream error:
private void HandleGrpcStreamError(Exception ex)
{
_log.Warning("gRPC stream broke for {0}: {1}", _instanceUniqueName, ex.Message);
if (_retryCount >= MaxRetries)
{
_onTerminated();
Context.Stop(Self);
return;
}
_retryCount++;
// Try the other node, then cycle back
_currentEndpoint = _currentEndpoint == _grpcNodeA ? _grpcNodeB : _grpcNodeA;
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromSeconds(_retryCount > 1 ? 5 : 0),
Self, new ReconnectGrpcStream(), ActorRefs.NoSender);
}
```
### 2. Central Node Failover (Active → Standby)
**What happens**: The active central node goes down. The standby becomes leader within ~25s (Akka failover). Traefik detects via `/health/active` and routes new traffic to the new leader.
**gRPC impact**: All `SiteStreamGrpcClient` instances, `GrpcChannel`s, and `DebugStreamBridgeActor`s on the old central node are destroyed. The site-side gRPC server detects dead clients via keepalive (see Connection Keepalive section) and cleans up.
**Central response**: Nothing to do — the old node is dead. On the new active node:
- Users reconnect to the debug view (Blazor circuit was lost with the old node)
- CLI clients reconnect via `.WithAutomaticReconnect()` (SignalR)
- Fresh `DebugStreamBridgeActor` + gRPC stream created on demand
**No automatic session migration**: Debug sessions are not persisted. When central fails over, active debug/stream sessions end. Users re-subscribe. This is consistent with how the system already handles central failover for all stateful sessions (Blazor circuits, SignalR connections).
### 3. Network Partition (Central ↔ Site Temporarily Unreachable)
**What happens**: Network between central and site drops but both clusters are running fine.
**gRPC impact**: gRPC keepalive pings fail on both sides:
- **Site side**: Detects dead client within ~25s, tears down subscription (see Keepalive section)
- **Central side**: `ResponseStream.MoveNext()` throws `RpcException` after keepalive timeout
**Central response**: Same as site node failover — bridge actor enters reconnecting state, retries with backoff. When network recovers, reconnection succeeds and streaming resumes.
**ClusterClient behavior**: ClusterClient also detects the partition independently (transport heartbeat failure, 10s threshold). `CentralCommunicationActor` fires `ConnectionStateChanged(isConnected: false)` and sends `DebugStreamTerminated` to the bridge actor. The bridge actor may receive both the gRPC error and the `DebugStreamTerminated` — it should handle both idempotently (first one triggers reconnect/terminate, second is ignored).
### 4. Site Node Restart (Same Node Comes Back)
**What happens**: A site node restarts (e.g., Windows Service restart, container recreation). The Akka cluster reforms, Instance Actors are recreated.
**gRPC impact**: Same as site node failover — the gRPC stream on that node was broken when the process died. The gRPC server starts fresh on the restarted node.
**Central response**: Bridge actor reconnects (same retry logic as scenario 1). The new gRPC stream connects to the restarted node's fresh `SiteStreamGrpcServer`, which subscribes to the newly recreated `SiteStreamManager`.
### Reconnection State Machine (DebugStreamBridgeActor)
```
┌──────────────────┐
│ Streaming │ ◄── Normal state: gRPC stream active
└────────┬─────────┘
│ gRPC stream error / keepalive timeout
┌──────────────────┐
┌──► │ Reconnecting │ ── try other node endpoint
│ └────────┬─────────┘
│ │
│ ┌────────┴─────────┐
│ │ │
│ success failure (retry < max)
│ │ │
│ ▼ │
│ Streaming schedule retry (5s backoff)
│ │
└───────────────────────┘
failure (retry >= max)
┌──────────────────┐
│ Terminated │ ── notify consumer, stop actor
└──────────────────┘
```
### Summary
| Scenario | Site Cleanup | Central Response | Data Gap |
|----------|-------------|-----------------|----------|
| Site failover | Automatic (process death) | Reconnect to NodeB, retry NodeA | Yes (~30s) |
| Central failover | Keepalive timeout (~25s) | Sessions lost, user re-subscribes | Session ends |
| Network partition | Keepalive timeout (~25s) | Reconnect with backoff | Yes (partition duration) |
| Site restart | Automatic (process death) | Reconnect to restarted node | Yes (~30s) |
## Backpressure and Flow Control
Three layers of flow control:
1. **gRPC/HTTP2**: Built-in TCP flow control. If the central client is slow reading, the site's `WriteAsync` eventually blocks.
2. **Channel\<T\>**: Bounded at 1000 with DropOldest. If gRPC is backpressured AND the channel fills, oldest events are dropped (consistent with SiteStreamManager's existing overflow strategy).
3. **SiteStreamManager**: Akka.Streams source with per-subscriber bounded buffer (configurable via `SiteRuntimeOptions.StreamBufferSize`, default 1000, DropHead).
## Connection Keepalive & Orphan Stream Prevention
If the central cluster crashes, loses network, or fails over without unsubscribing, the site must detect the dead client and tear down the streaming subscription. Three complementary layers handle this:
### 1. TCP-Level Detection (seconds — clean disconnect)
When the central process dies or the TCP connection resets cleanly, the site's `IServerStreamWriter.WriteAsync()` throws `RpcException` with `StatusCode.Cancelled`. The gRPC method's `finally` block runs, cleaning up the SiteStreamManager subscription. This fires within seconds on a clean TCP RST.
### 2. gRPC Keepalive Pings (1025s — network partition / silent failure)
gRPC supports HTTP/2 PING frames for proactive liveness detection. Configure on both sides:
**Site server (Kestrel):**
```csharp
builder.Services.AddGrpc(options =>
{
// If the server sends a ping and gets no ACK within this timeout, it closes the connection.
// This catches silent client death (crash without TCP RST, network partition).
options.KeepAliveTimeout = TimeSpan.FromSeconds(20);
});
// Kestrel HTTP/2 keep-alive settings
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenAnyIP(grpcPort, listenOptions =>
{
listenOptions.Protocols = HttpProtocols.Http2;
});
// Kestrel sends HTTP/2 PING frames at this interval
options.Limits.Http2.KeepAlivePingDelay = TimeSpan.FromSeconds(15);
// Close connection if PING ACK not received within this timeout
options.Limits.Http2.KeepAlivePingTimeout = TimeSpan.FromSeconds(10);
});
```
**Central client (GrpcChannel):**
```csharp
var channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
// Client sends PING frames at this interval to keep the connection alive
// and detect server-side death
KeepAlivePingDelay = TimeSpan.FromSeconds(15),
// Close connection if no PING ACK within this timeout
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
// Send pings even when no active streams (keeps channel warm for fast reconnect)
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
}
});
```
**Detection timeline**: If central dies silently (no TCP RST), the site detects within `KeepAlivePingDelay + KeepAlivePingTimeout` = ~25 seconds. The `ServerCallContext.CancellationToken` fires, unblocking the `ReadAllAsync` loop, and the `finally` block cleans up.
### 3. Server-Side Stream Timeout (safety net — defense in depth)
As a final safety net, the gRPC server method can enforce a maximum stream duration or idle timeout. This catches edge cases where keepalive pings are disabled or misconfigured:
```csharp
// In SiteStreamGrpcServer.SubscribeInstance():
// Link the gRPC cancellation token with a maximum session timeout
using var sessionTimeout = CancellationTokenSource.CreateLinkedTokenSource(
context.CancellationToken);
sessionTimeout.CancelAfter(TimeSpan.FromHours(4)); // max stream lifetime
await foreach (var evt in channel.Reader.ReadAllAsync(sessionTimeout.Token))
{
await responseStream.WriteAsync(evt, sessionTimeout.Token);
}
```
The 4-hour lifetime is a safety net, not the primary detection mechanism. Normal cleanup happens via gRPC keepalive (25s) or TCP reset (seconds).
### Summary of Detection Layers
| Layer | Detects | Timeline | Mechanism |
|-------|---------|----------|-----------|
| TCP RST | Clean process death, connection close | 15s | OS-level TCP, `WriteAsync` throws |
| gRPC keepalive PING | Network partition, silent crash, firewall drop | ~25s | HTTP/2 PING frames, `CancellationToken` fires |
| Session timeout | Misconfigured keepalive, long-lived zombie streams | 4 hours | `CancellationTokenSource.CancelAfter` |
All three trigger the same cleanup path: `CancellationToken` cancels → `ReadAllAsync` exits → `finally` block removes SiteStreamManager subscription and stops relay actor.
### Configuration Defaults
Add to `CommunicationOptions` (`src/ScadaLink.Communication/CommunicationOptions.cs`):
```csharp
public TimeSpan GrpcKeepAlivePingDelay { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan GrpcKeepAlivePingTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan GrpcMaxStreamLifetime { get; set; } = TimeSpan.FromHours(4);
```
Bind from `appsettings.json` under `ScadaLink:Communication` (existing options section).
## Security Considerations
- **Internal Docker network**: Plain HTTP/2 (no TLS) — all site↔central traffic flows within `scadalink-net` Docker bridge network. Same as current Akka.NET remoting (unencrypted by default).
- **Production**: Enable TLS on the Kestrel gRPC endpoint via HTTPS certificate configuration. The `GrpcChannel` on central switches to `https://` scheme.
- **No authentication on the gRPC channel**: The channel is internal infrastructure (site↔central only). Authentication happens at the user-facing boundaries (LDAP/JWT for UI, Basic Auth for CLI/Management API).
## Adding New Event Types
The streaming channel is designed to carry any event type that can be scoped to an instance. Adding a new event type requires changes at four layers: proto, SiteStreamManager, gRPC server relay, and gRPC client conversion.
### Requirements for a New Event Type
1. **Must carry `InstanceUniqueName`**`SiteStreamManager.ForwardToSubscribers()` filters by instance name. Without it, the event can't be routed to the correct subscriber.
2. **Must be published to `SiteStreamManager`** — the gRPC server subscribes to `SiteStreamManager` via a relay actor. Any event that reaches the relay actor gets written to the gRPC stream.
3. **Must have a proto message** — the event crosses the gRPC wire boundary, so it needs a protobuf representation in `sitestream.proto`.
4. **Must be added to the `oneof event` in `SiteStreamEvent`** — this is how the client knows which event type arrived. Existing field numbers are never reused.
### Step-by-Step: Adding a New Event Type
Example: adding `ScriptErrorEvent` to the stream.
#### 1. Define the domain record
**File**: `src/ScadaLink.Commons/Messages/Streaming/ScriptErrorEvent.cs`
```csharp
namespace ScadaLink.Commons.Messages.Streaming;
public record ScriptErrorEvent(
string InstanceUniqueName, // Required for filtering
string ScriptName,
string ErrorMessage,
DateTimeOffset Timestamp);
```
#### 2. Add the proto message
**File**: `src/ScadaLink.Communication/Protos/sitestream.proto`
Add the message definition and a new field to the `oneof`:
```protobuf
message SiteStreamEvent {
string correlation_id = 1;
oneof event {
AttributeValueUpdate attribute_changed = 2;
AlarmStateUpdate alarm_changed = 3;
ScriptErrorUpdate script_error = 4; // ← new field, next available number
}
}
message ScriptErrorUpdate {
string instance_unique_name = 1;
string script_name = 2;
string error_message = 3;
int64 timestamp_utc_ticks = 4;
}
```
Re-generate the C# stubs and check them into `SiteStreamGrpc/`.
#### 3. Publish to SiteStreamManager
**File**: `src/ScadaLink.SiteRuntime/Streaming/SiteStreamManager.cs`
Add a publish method (follows the existing pattern exactly):
```csharp
public void PublishScriptError(ScriptErrorEvent error)
{
_sourceActor?.Tell(error);
ForwardToSubscribers(error.InstanceUniqueName, error);
}
```
`ForwardToSubscribers` already accepts `object message` — no changes needed to the forwarding infrastructure. It filters by `instanceName` and Tells the message to all matching subscribers.
#### 4. Call the publish method from the source actor
**File**: wherever the event originates (e.g., `ScriptExecutionActor.cs`)
```csharp
_streamManager?.PublishScriptError(new ScriptErrorEvent(
_instanceUniqueName, _scriptName, ex.Message, DateTimeOffset.UtcNow));
```
#### 5. Handle in the gRPC server relay actor
**File**: `src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs` (the `StreamRelayActor`)
The relay actor receives events from `SiteStreamManager` and writes them to the `Channel<SiteStreamEvent>`. Add a `Receive<ScriptErrorEvent>` handler that converts to the proto message:
```csharp
Receive<ScriptErrorEvent>(evt =>
{
var protoEvt = new SiteStreamEvent
{
CorrelationId = _correlationId,
ScriptError = new ScriptErrorUpdate
{
InstanceUniqueName = evt.InstanceUniqueName,
ScriptName = evt.ScriptName,
ErrorMessage = evt.ErrorMessage,
TimestampUtcTicks = evt.Timestamp.UtcTicks
}
};
_channel.TryWrite(protoEvt);
});
```
#### 6. Handle in the gRPC client converter
**File**: `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs` (the `ConvertToDomainEvent` method)
Add a case for the new proto `oneof` variant:
```csharp
private static object ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch
{
SiteStreamEvent.EventOneofCase.AttributeChanged => /* existing */,
SiteStreamEvent.EventOneofCase.AlarmChanged => /* existing */,
SiteStreamEvent.EventOneofCase.ScriptError => new ScriptErrorEvent(
evt.ScriptError.InstanceUniqueName,
evt.ScriptError.ScriptName,
evt.ScriptError.ErrorMessage,
new DateTimeOffset(evt.ScriptError.TimestampUtcTicks, TimeSpan.Zero)),
_ => evt // Unknown types pass through as raw proto
};
```
#### 7. Handle in the consumer (if needed)
The `DebugStreamBridgeActor` already passes events to its `_onEvent` callback as `object`. Consumers that care about the new type add a pattern match:
```csharp
// In SignalR hub callback:
ScriptErrorEvent error =>
hubClients.Client(connectionId).SendAsync("OnScriptError", error),
// In Blazor component:
case ScriptErrorEvent error:
_scriptErrors.Add(error);
_ = InvokeAsync(StateHasChanged);
break;
```
### Checklist for New Event Types
| Step | File(s) | What to Do |
|------|---------|------------|
| Domain record | `Commons/Messages/Streaming/` | Create record with `InstanceUniqueName` field |
| Proto message | `Communication/Protos/sitestream.proto` | Add message + `oneof` field (next number) |
| Regenerate stubs | `Communication/SiteStreamGrpc/` | Run `protoc`, check in generated files |
| Publish method | `SiteRuntime/Streaming/SiteStreamManager.cs` | Add `PublishXxx()` method |
| Source actor | Wherever event originates | Call `_streamManager?.PublishXxx(...)` |
| Server relay | `Communication/Grpc/SiteStreamGrpcServer.cs` | Add `Receive<T>` → proto conversion |
| Client converter | `Communication/Grpc/SiteStreamGrpcClient.cs` | Add `EventOneofCase` → domain conversion |
| Consumer (optional) | Hub, Blazor, CLI | Add pattern match for new type |
### Proto Versioning Rules
- **Never reuse field numbers** — deleted fields' numbers are reserved forever
- **Add new `oneof` variants with the next available field number** — old clients ignore unknown fields
- **Never change existing field types or numbers** — this breaks wire compatibility
- **The `oneof` pattern guarantees forward compatibility** — a client that doesn't know about `ScriptErrorUpdate` simply sees `EventCase == None` and can skip it
## Existing Codebase References
| Pattern | File | Relevance |
|---------|------|-----------|
| Proto file definition | `src/ScadaLink.DataConnectionLayer/Adapters/Protos/scada.proto` | Same proto3 syntax, server streaming (`stream VtqMessage`) |
| Pre-generated C# stubs | `src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyGrpc/` | Same approach — checked-in stubs, no `protoc` at build time |
| gRPC client + stream reader | `src/ScadaLink.DataConnectionLayer/Adapters/RealLmxProxyClient.cs` | Background task reads `ResponseStream`, invokes callback |
| gRPC server implementation | `infra/lmxfakeproxy/Services/ScadaServiceImpl.cs` | Service base class override pattern |
| Kestrel HTTP/2 setup | `infra/lmxfakeproxy/Program.cs` | `HttpProtocols.Http2`, `AddGrpc()`, `MapGrpcService<T>()` |
| SiteStreamManager | `src/ScadaLink.SiteRuntime/Streaming/SiteStreamManager.cs` | Subscribe/filter by instance, per-subscriber buffer, DropHead overflow |
| Per-site client caching | `CentralCommunicationActor._siteClients` dictionary | One client per site, refresh on address change |
| Bridge actor pattern | `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs` | Per-session actor with callbacks, adapted to use gRPC instead of Akka messages |
## Implementation Summary
### New Files
| File | Purpose |
|------|---------|
| `src/ScadaLink.Communication/Protos/sitestream.proto` | Proto definition |
| `src/ScadaLink.Communication/SiteStreamGrpc/` | Pre-generated C# stubs |
| `src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs` | Site-side gRPC streaming server |
| `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs` | Central-side gRPC streaming client |
| `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs` | Per-site client factory/cache |
| `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs` | Per-site gRPC client cache, reads GrpcNodeA/BAddress from Site entity |
### Modified Files
| File | Change |
|------|--------|
| `src/ScadaLink.Communication/ScadaLink.Communication.csproj` | Add `Grpc.AspNetCore` + `Grpc.Net.Client` packages |
| `src/ScadaLink.Host/Program.cs` | Site: switch to WebApplicationBuilder + Kestrel gRPC |
| `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs` | Use gRPC client for streaming (ClusterClient for snapshot only) |
| `src/ScadaLink.Communication/DebugStreamService.cs` | Inject `SiteStreamGrpcClientFactory` |
| `src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs` | Remove `DebugStreamEvent` forwarding (publish to SiteStreamManager only) |
| `src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs` | Remove `Receive<DebugStreamEvent>` handler |
| `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs` | Remove `HandleDebugStreamEvent` |
| `docker/docker-compose.yml` | Expose gRPC ports for site nodes |
### Deleted Files
| File | Reason |
|------|--------|
| `src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs` | No longer needed — events flow via gRPC, not ClusterClient |
## Design Review Notes
The following concerns were identified during external review. Items marked **[V1]** should be addressed in the initial implementation. Items marked **[Future]** are noted for consideration as the transport scales beyond debug view.
### [V1] Snapshot-to-Stream Handoff Race
The initial snapshot arrives via ClusterClient, then the gRPC stream opens separately. Events between snapshot generation and stream establishment can be missed or duplicated.
**Mitigation**: Open the gRPC stream **first**, then request the snapshot via ClusterClient. The gRPC stream buffers events from the moment it connects. The consumer applies the snapshot as the baseline, then replays any buffered gRPC events with timestamps newer than the snapshot. This is a simple timestamp-based dedup — no sequence numbers needed for V1.
### [V1] Stream Authority — Which Site Node to Connect To
Both site nodes may be running, but only the active node (hosting the Deployment Manager singleton) has live Instance Actors and a populated SiteStreamManager.
**Rule**: Central connects to the site node whose Akka address matches the singleton owner. In practice, `CentralCommunicationActor` already tracks which site node is reachable via ClusterClient — the gRPC client factory should use the same node selection. Try `GrpcNodeAAddress` first; on failure, try `GrpcNodeBAddress`.
The standby site node's gRPC server will accept connections but its SiteStreamManager will have no subscribers and no events. A connected stream to the standby will simply be idle (no data). On site failover, the bridge actor reconnects and picks up the new active node.
### [V1] Startup/Shutdown Ordering
Switching site host to `WebApplicationBuilder` requires coordinating ASP.NET Core and Akka.NET lifecycles:
- **Startup**: Actor system and SiteStreamManager must be initialized before `MapGrpcService<SiteStreamGrpcServer>()` begins accepting connections. Gate gRPC readiness on actor system startup (reject streams with `StatusCode.Unavailable` until ready).
- **Shutdown**: On `CoordinatedShutdown`, stop accepting new gRPC streams first, cancel all active streams (triggering client reconnect), then tear down actors. Use `IHostApplicationLifetime.ApplicationStopping` to signal the gRPC server.
### [V1] Duplicate Stream Prevention
Central bugs or reconnect storms could create multiple gRPC streams for the same instance/correlationId.
**Rule**: `SiteStreamGrpcServer` tracks active subscriptions by `correlation_id`. If a new `SubscribeInstance` arrives with a `correlation_id` that's already active, cancel the old stream before starting the new one. One stream per correlation ID.
### [V1] Observability
Add metrics for silent degradation detection:
| Metric | Source | Purpose |
|--------|--------|---------|
| `grpc_streams_active` | SiteStreamGrpcServer | Active stream count per site node |
| `grpc_streams_events_sent` | SiteStreamGrpcServer | Events written to gRPC, by type |
| `grpc_streams_events_dropped` | Channel writer | Events dropped due to bounded buffer overflow |
| `grpc_streams_reconnects` | DebugStreamBridgeActor | Reconnection count and reason |
| `grpc_streams_duration` | SiteStreamGrpcServer | Stream lifetime (histogram) |
Emit via Serilog structured logging (existing infrastructure). Consider Prometheus counters if metrics export is added later.
### [V1] Proto Improvements
Use protobuf-native types instead of .NET-specific scalars:
```protobuf
import "google/protobuf/timestamp.proto";
enum Quality {
QUALITY_UNSPECIFIED = 0;
QUALITY_GOOD = 1;
QUALITY_UNCERTAIN = 2;
QUALITY_BAD = 3;
}
enum AlarmState {
ALARM_STATE_UNSPECIFIED = 0;
ALARM_STATE_NORMAL = 1;
ALARM_STATE_ACTIVE = 2;
}
message AttributeValueUpdate {
string instance_unique_name = 1;
string attribute_path = 2;
string attribute_name = 3;
string value = 4;
Quality quality = 5;
google.protobuf.Timestamp timestamp = 6;
}
message AlarmStateUpdate {
string instance_unique_name = 1;
string alarm_name = 2;
AlarmState state = 3;
int32 priority = 4;
google.protobuf.Timestamp timestamp = 5;
}
```
Reserve enum zero as `UNSPECIFIED` per proto3 convention. Use `google.protobuf.Timestamp` instead of `int64` ticks for cross-platform compatibility.
### [V1] Max Concurrent Streams
Define limits to prevent resource exhaustion:
- Max 100 concurrent gRPC streams per site node (configurable via `CommunicationOptions`)
- Server rejects with `StatusCode.ResourceExhausted` when limit reached
- One stream per `correlation_id` (duplicate prevention above)
## Documentation Updates
All documentation changes must be completed as part of the implementation — not deferred. The design docs are the source of truth for this system's architecture.
### High-Level Requirements
**Modify**: `docs/requirements/HighLevelReqs.md`
Update the following sections:
- **Section 5 (CentralSite Communication)**: Add gRPC streaming as a transport alongside ClusterClient. Clarify that ClusterClient handles command/control and gRPC handles real-time data streaming.
- **Section 13 (Non-Functional / Performance)**: Add gRPC streaming throughput expectations and backpressure behavior if applicable.
### Component-Level Requirements
| Document | Changes |
|----------|---------|
| `docs/requirements/Component-Communication.md` | Pattern 6 (Debug Streaming): Replace ClusterClient streaming path with gRPC. Add `SiteStreamGrpcServer`, `SiteStreamGrpcClient`, `SiteStreamGrpcClientFactory` to component responsibilities. Add gRPC port configuration to shared settings. Update dependencies/interactions. |
| `docs/requirements/Component-SiteRuntime.md` | Update SiteStreamManager section: note that gRPC server subscribes to the stream for cross-cluster delivery. InstanceActor no longer forwards `DebugStreamEvent` directly. |
| `docs/requirements/Component-Host.md` | Site host now uses `WebApplicationBuilder` with Kestrel HTTP/2 for gRPC. Document `GrpcPort` config, startup/shutdown ordering with Akka.NET. |
| `docs/requirements/Component-CentralUI.md` | Debug view streaming path updated (gRPC, not ClusterClient for events). |
| `docs/requirements/Component-CLI.md` | `site create` / `site update` commands updated with `--grpc-node-a-address` / `--grpc-node-b-address`. |
| `docs/requirements/Component-ConfigurationDatabase.md` | Migration for `GrpcNodeAAddress` / `GrpcNodeBAddress` on Sites table. |
| `docs/requirements/Component-ClusterInfrastructure.md` | Note gRPC port alongside Akka remoting port in node configuration. |
### CLAUDE.md
Update key design decisions:
- Add "gRPC streaming for site→central real-time data; ClusterClient for command/control only" under Data & Communication
- Add gRPC port convention under Architecture & Runtime
- Update current component count if a new component is introduced
### README.md
Update the architecture diagram to show the gRPC streaming channel between site and central clusters.
## Testing Strategy
### Tests to Update (Existing)
| Test File | Change |
|-----------|--------|
| `tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs` | Remove `DebugStreamEventForwarder` test helper and `DebugStreamEvent` expectations. Debug subscriber tests should verify events reach SiteStreamManager only (not direct forwarding). |
| `tests/ScadaLink.Communication.Tests/` | Remove any tests for `DebugStreamEvent` routing through `CentralCommunicationActor` and `SiteCommunicationActor`. |
| `tests/ScadaLink.Host.Tests/HealthCheckTests.cs` | May need updates if site host builder changes affect test factory setup. |
### New Tests Required
| Test | Project | What to Verify |
|------|---------|----------------|
| `SiteStreamGrpcServerTests` | `ScadaLink.Communication.Tests` | Server accepts subscription, relays events from mock SiteStreamManager to gRPC stream, cleans up on cancellation, rejects duplicate `correlation_id`, enforces max concurrent streams limit, rejects before actor system ready. |
| `SiteStreamGrpcClientTests` | `ScadaLink.Communication.Tests` | Client connects, reads stream, converts proto→domain types, invokes callback, handles stream errors, reconnects to NodeB on failure. |
| `SiteStreamGrpcClientFactoryTests` | `ScadaLink.Communication.Tests` | Creates and caches per-site clients, derives endpoints from Site entity, disposes on site removal. |
| `DebugStreamBridgeActorTests` (update) | `ScadaLink.Communication.Tests` | Verify bridge actor opens gRPC stream after snapshot, receives events via gRPC callback (not Akka messages), reconnects on stream error with node failover, terminates after max retries. |
| `GrpcStreamIntegrationTest` | `ScadaLink.IntegrationTests` | End-to-end: site gRPC server → central gRPC client → bridge actor → callback. Use in-process test server (`WebApplicationFactory` or `TestServer`). Verify event delivery, cancellation cleanup, keepalive behavior. |
| `SiteHostStartupTests` | `ScadaLink.Host.Tests` | Site host starts with `WebApplicationBuilder`, gRPC port configured, `MapGrpcService` registered, rejects streams before actor system ready. |
| `ProtoRoundtripTests` | `ScadaLink.Communication.Tests` | Serialize/deserialize each proto message type, verify `oneof` discrimination, verify enum mappings, verify `google.protobuf.Timestamp` conversion. |
### Test Coverage Guardrails
To ensure the implementation stays compliant with this plan:
1. **Proto contract tests**: A test that loads `sitestream.proto` and verifies all `oneof` variants have handlers in both `StreamRelayActor` (server) and `ConvertToDomainEvent` (client). If a new proto field is added without handlers, the test fails. Prevents silent event type gaps.
2. **Architectural constraint test**: Add to `ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs` — verify that `DebugStreamEvent` type no longer exists in the assembly (ensures the ClusterClient streaming path is fully removed and doesn't creep back).
3. **Startup validation test**: Site host integration test that verifies gRPC server rejects `SubscribeInstance` calls with `StatusCode.Unavailable` before the actor system is ready, and accepts them after.
4. **Cleanup verification test**: gRPC server test that verifies after stream cancellation, the SiteStreamManager subscription count returns to its pre-test value (no leaked subscriptions).
5. **No ClusterClient streaming regression**: Integration test that subscribes via gRPC, triggers attribute changes, and verifies events arrive via gRPC callback — NOT via `DebugStreamEvent` through `CentralCommunicationActor`. This prevents accidental reintroduction of the ClusterClient streaming path.
## Implementation Plan Guardrails
When creating implementation plans (work packages) from this document, each plan must include:
1. **Pre-implementation checklist**:
- [ ] Identify all requirement docs affected by this work package
- [ ] Identify all existing tests that need updating
- [ ] List new tests required before marking the work package complete
2. **Per-step requirements**:
- Every new public class/interface must have covering unit tests
- Every modified actor must have its existing tests updated to reflect new behavior
- Every proto change must have roundtrip serialization tests
- Every config change must be validated in `StartupValidator` with a covering test
3. **Completion criteria** (a work package is not done until):
- [ ] All identified requirement docs are updated
- [ ] All existing tests pass (no regressions)
- [ ] All new tests listed in this document are implemented and passing
- [ ] `dotnet build` succeeds with zero warnings
- [ ] CLAUDE.md key design decisions are updated if architectural choices changed
- [ ] `docker/deploy.sh` succeeds and end-to-end streaming verified manually
4. **Review checkpoint**: After each implementation phase (site server, central client, cleanup, config), run the full test suite and verify the streaming path end-to-end before proceeding to the next phase. Do not batch all phases into a single commit.