docs: add gRPC streaming channel implementation plan with task tracking
This commit is contained in:
634
docs/plans/2026-03-21-grpc-streaming-channel.md
Normal file
634
docs/plans/2026-03-21-grpc-streaming-channel.md
Normal file
@@ -0,0 +1,634 @@
|
||||
# gRPC Streaming Channel Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Replace ClusterClient-based debug event streaming with a dedicated gRPC server-streaming channel from site nodes to central, following the design in `docs/plans/grpc_streams.md`.
|
||||
|
||||
**Architecture:** Each site node runs a gRPC server (`SiteStreamGrpcServer`) on a dedicated HTTP/2 port. Central creates per-site gRPC clients (`SiteStreamGrpcClient`) that open server-streaming subscriptions filtered by instance name. Events flow: SiteStreamManager → relay actor → Channel\<T\> → gRPC stream → central callback → DebugStreamBridgeActor → SignalR/Blazor. ClusterClient continues handling command/control (subscribe/unsubscribe/snapshot).
|
||||
|
||||
**Tech Stack:** .NET 10, gRPC (Grpc.AspNetCore + Grpc.Net.Client), Protocol Buffers, Akka.NET, ASP.NET Core Kestrel, System.Threading.Channels
|
||||
|
||||
**Design Reference:** `docs/plans/grpc_streams.md` — full architecture, proto definition, failover, keepalive, backpressure, and review notes.
|
||||
|
||||
---
|
||||
|
||||
### Task 0: Proto Definition & Stub Generation
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ScadaLink.Communication/Protos/sitestream.proto`
|
||||
- Create: `src/ScadaLink.Communication/SiteStreamGrpc/` (generated stubs)
|
||||
- Modify: `src/ScadaLink.Communication/ScadaLink.Communication.csproj`
|
||||
|
||||
**Step 1: Create the proto file**
|
||||
|
||||
Create `src/ScadaLink.Communication/Protos/sitestream.proto` with the proto definition from `docs/plans/grpc_streams.md` "Proto Improvements" section (V1 review notes version with enums and `google.protobuf.Timestamp`):
|
||||
|
||||
```protobuf
|
||||
syntax = "proto3";
|
||||
option csharp_namespace = "ScadaLink.Communication.Grpc";
|
||||
package sitestream;
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
service SiteStreamService {
|
||||
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
|
||||
}
|
||||
|
||||
message InstanceStreamRequest {
|
||||
string correlation_id = 1;
|
||||
string instance_unique_name = 2;
|
||||
}
|
||||
|
||||
message SiteStreamEvent {
|
||||
string correlation_id = 1;
|
||||
oneof event {
|
||||
AttributeValueUpdate attribute_changed = 2;
|
||||
AlarmStateUpdate alarm_changed = 3;
|
||||
}
|
||||
}
|
||||
|
||||
enum Quality {
|
||||
QUALITY_UNSPECIFIED = 0;
|
||||
QUALITY_GOOD = 1;
|
||||
QUALITY_UNCERTAIN = 2;
|
||||
QUALITY_BAD = 3;
|
||||
}
|
||||
|
||||
enum AlarmStateEnum {
|
||||
ALARM_STATE_UNSPECIFIED = 0;
|
||||
ALARM_STATE_NORMAL = 1;
|
||||
ALARM_STATE_ACTIVE = 2;
|
||||
}
|
||||
|
||||
message AttributeValueUpdate {
|
||||
string instance_unique_name = 1;
|
||||
string attribute_path = 2;
|
||||
string attribute_name = 3;
|
||||
string value = 4;
|
||||
Quality quality = 5;
|
||||
google.protobuf.Timestamp timestamp = 6;
|
||||
}
|
||||
|
||||
message AlarmStateUpdate {
|
||||
string instance_unique_name = 1;
|
||||
string alarm_name = 2;
|
||||
AlarmStateEnum state = 3;
|
||||
int32 priority = 4;
|
||||
google.protobuf.Timestamp timestamp = 5;
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Add gRPC NuGet packages**
|
||||
|
||||
Add to `src/ScadaLink.Communication/ScadaLink.Communication.csproj`:
|
||||
```xml
|
||||
<PackageReference Include="Grpc.AspNetCore" Version="2.71.0" />
|
||||
<PackageReference Include="Grpc.Net.Client" Version="2.71.0" />
|
||||
<PackageReference Include="Google.Protobuf" Version="3.29.3" />
|
||||
<PackageReference Include="Grpc.Tools" Version="2.71.0" PrivateAssets="All" />
|
||||
```
|
||||
|
||||
Also add `<FrameworkReference Include="Microsoft.AspNetCore.App" />` if not already present (needed for `Grpc.AspNetCore`).
|
||||
|
||||
**Step 3: Generate C# stubs**
|
||||
|
||||
Run `protoc` locally to generate stubs. Check generated files into `src/ScadaLink.Communication/SiteStreamGrpc/`. Follow the same pattern as `src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyGrpc/` — pre-generated, no protoc at build time.
|
||||
|
||||
**Step 4: Verify build**
|
||||
|
||||
Run: `dotnet build src/ScadaLink.Communication/`
|
||||
Expected: Build succeeded, 0 errors
|
||||
|
||||
**Step 5: Write proto roundtrip tests**
|
||||
|
||||
Create `tests/ScadaLink.Communication.Tests/Grpc/ProtoRoundtripTests.cs`:
|
||||
- Test `AttributeValueUpdate` serialization/deserialization with all Quality enum values
|
||||
- Test `AlarmStateUpdate` serialization/deserialization with all AlarmStateEnum values
|
||||
- Test `SiteStreamEvent` oneof discrimination (attribute vs alarm)
|
||||
- Test `google.protobuf.Timestamp` conversion to/from `DateTimeOffset`
|
||||
|
||||
**Step 6: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: All pass including new proto tests
|
||||
|
||||
**Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Communication/Protos/ src/ScadaLink.Communication/SiteStreamGrpc/ src/ScadaLink.Communication/ScadaLink.Communication.csproj tests/ScadaLink.Communication.Tests/
|
||||
git commit -m "feat: add sitestream.proto definition and generated gRPC stubs"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Site Config — GrpcPort in NodeOptions
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ScadaLink.Host/NodeOptions.cs:8`
|
||||
- Modify: `src/ScadaLink.Host/StartupValidator.cs:43-48`
|
||||
- Modify: `src/ScadaLink.Host/appsettings.Site.json:7`
|
||||
- Test: `tests/ScadaLink.Host.Tests/`
|
||||
|
||||
**Step 1: Write failing test for GrpcPort validation**
|
||||
|
||||
Add to existing startup validator tests: test that a site node with `GrpcPort` outside 1-65535 fails validation, and that a valid `GrpcPort` passes.
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Host.Tests/`
|
||||
Expected: New test FAILS (GrpcPort not validated yet)
|
||||
|
||||
**Step 3: Add GrpcPort to NodeOptions**
|
||||
|
||||
In `src/ScadaLink.Host/NodeOptions.cs`, add:
|
||||
```csharp
|
||||
public int GrpcPort { get; set; } = 8083;
|
||||
```
|
||||
|
||||
**Step 4: Add validation in StartupValidator**
|
||||
|
||||
In `src/ScadaLink.Host/StartupValidator.cs`, after the existing site validation block (line ~43):
|
||||
```csharp
|
||||
if (role == "Site")
|
||||
{
|
||||
var grpcPortStr = nodeSection["GrpcPort"];
|
||||
if (grpcPortStr != null && (!int.TryParse(grpcPortStr, out var gp) || gp < 1 || gp > 65535))
|
||||
errors.Add("ScadaLink:Node:GrpcPort must be 1-65535");
|
||||
}
|
||||
```
|
||||
|
||||
**Step 5: Add GrpcPort to appsettings.Site.json**
|
||||
|
||||
In `src/ScadaLink.Host/appsettings.Site.json`, add after `"RemotingPort": 8082`:
|
||||
```json
|
||||
"GrpcPort": 8083
|
||||
```
|
||||
|
||||
**Step 6: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Host.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Host/ tests/ScadaLink.Host.Tests/
|
||||
git commit -m "feat: add GrpcPort config to NodeOptions with startup validation"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Site Entity — gRPC Address Fields
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ScadaLink.Commons/Entities/Sites/Site.cs:9-10`
|
||||
- Modify: `src/ScadaLink.Commons/Messages/Management/SiteCommands.cs:5-6`
|
||||
- Modify: `src/ScadaLink.ConfigurationDatabase/` (migration)
|
||||
- Modify: `src/ScadaLink.ManagementService/ManagementActor.cs` (handlers)
|
||||
- Modify: `src/ScadaLink.CLI/Commands/SiteCommands.cs`
|
||||
- Modify: `src/ScadaLink.CentralUI/Components/Pages/Admin/Sites.razor`
|
||||
- Test: `tests/ScadaLink.Commons.Tests/`
|
||||
|
||||
**Step 1: Add fields to Site entity**
|
||||
|
||||
In `src/ScadaLink.Commons/Entities/Sites/Site.cs`, add after `NodeBAddress`:
|
||||
```csharp
|
||||
public string? GrpcNodeAAddress { get; set; }
|
||||
public string? GrpcNodeBAddress { get; set; }
|
||||
```
|
||||
|
||||
**Step 2: Update management commands**
|
||||
|
||||
In `src/ScadaLink.Commons/Messages/Management/SiteCommands.cs`, add `GrpcNodeAAddress` and `GrpcNodeBAddress` optional params to `CreateSiteCommand` and `UpdateSiteCommand`.
|
||||
|
||||
**Step 3: Add EF Core migration**
|
||||
|
||||
Run: `dotnet ef migrations add AddGrpcNodeAddresses --project src/ScadaLink.ConfigurationDatabase/ --startup-project src/ScadaLink.Host/`
|
||||
|
||||
Or create manual migration adding nullable `GrpcNodeAAddress` and `GrpcNodeBAddress` string columns to Sites table.
|
||||
|
||||
**Step 4: Update ManagementActor handlers**
|
||||
|
||||
In `src/ScadaLink.ManagementService/ManagementActor.cs`, update `HandleCreateSite` and `HandleUpdateSite` to pass gRPC addresses to the repository.
|
||||
|
||||
**Step 5: Update CLI SiteCommands**
|
||||
|
||||
In `src/ScadaLink.CLI/Commands/SiteCommands.cs`, add `--grpc-node-a-address` and `--grpc-node-b-address` options to `site create` and `site update` commands.
|
||||
|
||||
**Step 6: Update Central UI Sites.razor**
|
||||
|
||||
In `src/ScadaLink.CentralUI/Components/Pages/Admin/Sites.razor`:
|
||||
- Add `_formGrpcNodeAAddress` and `_formGrpcNodeBAddress` form fields
|
||||
- Add table columns for gRPC addresses
|
||||
- Wire into create/update handlers
|
||||
|
||||
**Step 7: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Commons.Tests/ && dotnet test tests/ScadaLink.CLI.Tests/ && dotnet test tests/ScadaLink.Host.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Commons/ src/ScadaLink.ConfigurationDatabase/ src/ScadaLink.ManagementService/ src/ScadaLink.CLI/ src/ScadaLink.CentralUI/
|
||||
git commit -m "feat: add GrpcNodeAAddress/GrpcNodeBAddress to Site entity, CLI, and UI"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Site-Side gRPC Server — StreamRelayActor
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ScadaLink.Communication/Grpc/StreamRelayActor.cs`
|
||||
- Test: `tests/ScadaLink.Communication.Tests/Grpc/StreamRelayActorTests.cs`
|
||||
|
||||
**Step 1: Write failing test**
|
||||
|
||||
Test that `StreamRelayActor` receives `AttributeValueChanged` and writes a correctly-converted `SiteStreamEvent` proto message to a `ChannelWriter<SiteStreamEvent>`. Use Akka.TestKit.
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: FAIL (class doesn't exist)
|
||||
|
||||
**Step 3: Implement StreamRelayActor**
|
||||
|
||||
Create `src/ScadaLink.Communication/Grpc/StreamRelayActor.cs`:
|
||||
- `ReceiveActor` that receives `AttributeValueChanged` and `AlarmStateChanged`
|
||||
- Converts each to the proto `SiteStreamEvent` with correct enum mappings and `Timestamp` conversion
|
||||
- Writes to `ChannelWriter<SiteStreamEvent>` via `TryWrite`
|
||||
- Logs dropped events when channel is full
|
||||
|
||||
**Step 4: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Communication/Grpc/ tests/ScadaLink.Communication.Tests/
|
||||
git commit -m "feat: add StreamRelayActor bridging Akka events to gRPC proto channel"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Site-Side gRPC Server — SiteStreamGrpcServer
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs`
|
||||
- Test: `tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs`
|
||||
|
||||
**Step 1: Write failing tests**
|
||||
|
||||
- Server accepts subscription, relays events from mock SiteStreamManager to gRPC stream
|
||||
- Server cleans up SiteStreamManager subscription on cancellation
|
||||
- Server rejects duplicate `correlation_id` (cancels old stream)
|
||||
- Server enforces max concurrent streams (100), rejects with `ResourceExhausted`
|
||||
- Server rejects with `Unavailable` before actor system is ready
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: FAIL
|
||||
|
||||
**Step 3: Implement SiteStreamGrpcServer**
|
||||
|
||||
Create `src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs`:
|
||||
- Inherits `SiteStreamService.SiteStreamServiceBase`
|
||||
- Injects `SiteStreamManager` (or interface), `ActorSystem`
|
||||
- Tracks active streams in `ConcurrentDictionary<string, CancellationTokenSource>`
|
||||
- `SubscribeInstance`: creates `Channel<SiteStreamEvent>`, creates `StreamRelayActor`, subscribes to SiteStreamManager, reads channel → writes to gRPC response stream
|
||||
- `finally`: removes subscription, stops relay actor, removes from active streams
|
||||
- Readiness gate: checks `ActorSystem` availability before accepting
|
||||
|
||||
**Step 4: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Communication/Grpc/ tests/ScadaLink.Communication.Tests/
|
||||
git commit -m "feat: add SiteStreamGrpcServer with Channel<T> bridge and stream limits"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Switch Site Host to WebApplicationBuilder + gRPC
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ScadaLink.Host/Program.cs:157-174`
|
||||
- Modify: `src/ScadaLink.Host/appsettings.Site.json`
|
||||
- Modify: `docker/docker-compose.yml`
|
||||
- Test: `tests/ScadaLink.Host.Tests/`
|
||||
|
||||
**Step 1: Write failing test**
|
||||
|
||||
Site host startup test: verify `WebApplicationBuilder` starts, gRPC port is configured, `MapGrpcService` is registered.
|
||||
|
||||
**Step 2: Switch site host from generic Host to WebApplicationBuilder**
|
||||
|
||||
In `src/ScadaLink.Host/Program.cs`, replace the `Host.CreateDefaultBuilder()` site section with `WebApplication.CreateBuilder()` + Kestrel HTTP/2 on `GrpcPort` + `AddGrpc()` + `MapGrpcService<SiteStreamGrpcServer>()`. Keep all existing service registrations via `SiteServiceRegistration.Configure()`.
|
||||
|
||||
Add gRPC keepalive settings from `CommunicationOptions`:
|
||||
- `KeepAlivePingDelay = 15s`
|
||||
- `KeepAlivePingTimeout = 10s`
|
||||
|
||||
**Step 3: Update docker-compose.yml**
|
||||
|
||||
Expose gRPC port 8083 for each site node:
|
||||
- Site-A: `9023:8083` / `9024:8083`
|
||||
- Site-B: `9033:8083` / `9034:8083`
|
||||
- Site-C: `9043:8083` / `9044:8083`
|
||||
|
||||
**Step 4: Add gRPC keepalive config to CommunicationOptions**
|
||||
|
||||
Add to `src/ScadaLink.Communication/CommunicationOptions.cs`:
|
||||
```csharp
|
||||
public TimeSpan GrpcKeepAlivePingDelay { get; set; } = TimeSpan.FromSeconds(15);
|
||||
public TimeSpan GrpcKeepAlivePingTimeout { get; set; } = TimeSpan.FromSeconds(10);
|
||||
public TimeSpan GrpcMaxStreamLifetime { get; set; } = TimeSpan.FromHours(4);
|
||||
public int GrpcMaxConcurrentStreams { get; set; } = 100;
|
||||
```
|
||||
|
||||
**Step 5: Run tests and build**
|
||||
|
||||
Run: `dotnet build src/ScadaLink.Host/ && dotnet test tests/ScadaLink.Host.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Host/ src/ScadaLink.Communication/ docker/
|
||||
git commit -m "feat: switch site host to WebApplicationBuilder with Kestrel gRPC server"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 6: Central-Side gRPC Client
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs`
|
||||
- Create: `src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs`
|
||||
- Test: `tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs`
|
||||
- Test: `tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs`
|
||||
|
||||
**Step 1: Write failing tests for SiteStreamGrpcClient**
|
||||
|
||||
- Client connects, reads stream, converts proto→domain types (`AttributeValueChanged`, `AlarmStateChanged`), invokes callback
|
||||
- Client handles stream errors (throws on `RpcException`)
|
||||
- Client cancellation stops the background reader
|
||||
|
||||
**Step 2: Implement SiteStreamGrpcClient**
|
||||
|
||||
- Creates `GrpcChannel` with keepalive settings from `CommunicationOptions`
|
||||
- `SubscribeAsync`: calls `SiteStreamService.SubscribeInstance()`, launches background task to read `ResponseStream`, converts proto→domain, invokes callback
|
||||
- `Unsubscribe`: cancels the `CancellationTokenSource` for the subscription
|
||||
- `IAsyncDisposable`: disposes channel
|
||||
|
||||
**Step 3: Write failing tests for SiteStreamGrpcClientFactory**
|
||||
|
||||
- Creates and caches per-site clients
|
||||
- Falls back to NodeB on NodeA connection failure
|
||||
- Disposes clients on site removal
|
||||
|
||||
**Step 4: Implement SiteStreamGrpcClientFactory**
|
||||
|
||||
- `GetOrCreateAsync(siteIdentifier, grpcNodeAAddress, grpcNodeBAddress)` → `SiteStreamGrpcClient`
|
||||
- Caches by `siteIdentifier` in `ConcurrentDictionary`
|
||||
- Manages `GrpcChannel` lifecycle
|
||||
|
||||
**Step 5: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Communication/Grpc/ tests/ScadaLink.Communication.Tests/
|
||||
git commit -m "feat: add SiteStreamGrpcClient and SiteStreamGrpcClientFactory"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 7: Update DebugStreamBridgeActor to Use gRPC
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs`
|
||||
- Modify: `src/ScadaLink.Communication/DebugStreamService.cs`
|
||||
- Modify: `src/ScadaLink.Communication/ServiceCollectionExtensions.cs`
|
||||
- Test: `tests/ScadaLink.Communication.Tests/`
|
||||
|
||||
**Step 1: Write failing tests for updated bridge actor**
|
||||
|
||||
- Bridge actor sends subscribe via ClusterClient, receives snapshot
|
||||
- After snapshot, opens gRPC stream via `SiteStreamGrpcClient`
|
||||
- Events from gRPC callback forwarded to `_onEvent`
|
||||
- On gRPC stream error: reconnects to other node with backoff (max 3 retries)
|
||||
- On stop: cancels gRPC + sends unsubscribe via ClusterClient
|
||||
- Handles `DebugStreamTerminated` idempotently
|
||||
|
||||
**Step 2: Update DebugStreamBridgeActor**
|
||||
|
||||
Rewrite to:
|
||||
1. `PreStart`: send `SubscribeDebugViewRequest` via ClusterClient (unchanged)
|
||||
2. On `DebugViewSnapshot` received: open gRPC stream first (per handoff race mitigation — stream first, then apply snapshot)
|
||||
3. gRPC callback delivers events to `Self` via `Tell` (marshals onto actor thread)
|
||||
4. On gRPC error: enter reconnecting state, try other node, backoff, max retries
|
||||
5. On stop: cancel gRPC subscription + send `UnsubscribeDebugViewRequest`
|
||||
|
||||
**Step 3: Update DebugStreamService**
|
||||
|
||||
Inject `SiteStreamGrpcClientFactory`. Resolve `GrpcNodeAAddress`/`GrpcNodeBAddress` from `Site` entity. Pass to bridge actor.
|
||||
|
||||
**Step 4: Register factory in DI**
|
||||
|
||||
In `src/ScadaLink.Communication/ServiceCollectionExtensions.cs`:
|
||||
```csharp
|
||||
services.AddSingleton<SiteStreamGrpcClientFactory>();
|
||||
```
|
||||
|
||||
**Step 5: Run tests**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Communication.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ScadaLink.Communication/ tests/ScadaLink.Communication.Tests/
|
||||
git commit -m "feat: update DebugStreamBridgeActor to use gRPC for streaming events"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 8: Remove ClusterClient Streaming Path
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs`
|
||||
- Modify: `src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs`
|
||||
- Modify: `src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs`
|
||||
- Delete: `src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs`
|
||||
- Test: `tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs`
|
||||
- Test: `tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs`
|
||||
|
||||
**Step 1: Remove DebugStreamEvent from InstanceActor**
|
||||
|
||||
Remove `_debugSubscriberCorrelationIds`, `_siteCommActor`, and all `DebugStreamEvent` forwarding from `PublishAndNotifyChildren` and `HandleAlarmStateChanged`. InstanceActor just publishes to `SiteStreamManager` — the gRPC server picks up events from there.
|
||||
|
||||
Keep `HandleSubscribeDebugView` (for snapshot) and `HandleUnsubscribeDebugView`.
|
||||
|
||||
**Step 2: Remove DebugStreamEvent from SiteCommunicationActor**
|
||||
|
||||
Remove `Receive<DebugStreamEvent>` handler.
|
||||
|
||||
**Step 3: Remove DebugStreamEvent from CentralCommunicationActor**
|
||||
|
||||
Remove `Receive<DebugStreamEvent>` handler and `HandleDebugStreamEvent` method.
|
||||
|
||||
**Step 4: Delete DebugStreamEvent.cs**
|
||||
|
||||
Delete `src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs`.
|
||||
|
||||
**Step 5: Update InstanceActorIntegrationTests**
|
||||
|
||||
Remove `DebugStreamEventForwarder` test helper. Update debug subscriber tests to verify events reach `SiteStreamManager` only.
|
||||
|
||||
**Step 6: Add architectural constraint test**
|
||||
|
||||
In `tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs`, add test verifying `DebugStreamEvent` type no longer exists in the Commons assembly.
|
||||
|
||||
**Step 7: Run full test suite**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.SiteRuntime.Tests/ && dotnet test tests/ScadaLink.Communication.Tests/ && dotnet test tests/ScadaLink.Commons.Tests/ && dotnet test tests/ScadaLink.Host.Tests/`
|
||||
Expected: All pass
|
||||
|
||||
**Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add -A
|
||||
git commit -m "refactor: remove ClusterClient streaming path (DebugStreamEvent), events flow via gRPC"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 9: Docker & End-to-End Integration Test
|
||||
|
||||
**Files:**
|
||||
- Modify: `docker/docker-compose.yml`
|
||||
- Modify: `docker/deploy.sh` (if needed)
|
||||
- Create: `tests/ScadaLink.IntegrationTests/Grpc/GrpcStreamIntegrationTests.cs`
|
||||
|
||||
**Step 1: Update docker-compose site appsettings**
|
||||
|
||||
Ensure site container configs include `GrpcPort: 8083` and gRPC ports are exposed.
|
||||
|
||||
**Step 2: Write integration test**
|
||||
|
||||
End-to-end: start in-process site gRPC server → central gRPC client → verify event delivery, cancellation cleanup.
|
||||
|
||||
**Step 3: Build and deploy cluster**
|
||||
|
||||
Run: `bash docker/deploy.sh`
|
||||
Expected: All containers start, gRPC ports exposed
|
||||
|
||||
**Step 4: Manual end-to-end verification**
|
||||
|
||||
Run: `timeout 35 dotnet run --project src/ScadaLink.CLI -- --url http://localhost:9000 --username multi-role --password password debug stream --id 1 --format table`
|
||||
|
||||
Expected: Initial snapshot + streaming ATTR/ALARM rows via gRPC (not ClusterClient).
|
||||
|
||||
Write an OPC UA tag to verify:
|
||||
```bash
|
||||
python3 infra/tools/opcua_tool.py write --node "ns=3;s=JoeAppEngine.BTCS" --value "gRPC streaming test" --type String
|
||||
```
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/ScadaLink.IntegrationTests/ docker/
|
||||
git commit -m "test: add gRPC stream integration test and docker config"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 10: Documentation Updates
|
||||
|
||||
**Files:**
|
||||
- Modify: `docs/requirements/HighLevelReqs.md`
|
||||
- Modify: `docs/requirements/Component-Communication.md`
|
||||
- Modify: `docs/requirements/Component-SiteRuntime.md`
|
||||
- Modify: `docs/requirements/Component-Host.md`
|
||||
- Modify: `docs/requirements/Component-CentralUI.md`
|
||||
- Modify: `docs/requirements/Component-CLI.md`
|
||||
- Modify: `docs/requirements/Component-ConfigurationDatabase.md`
|
||||
- Modify: `docs/requirements/Component-ClusterInfrastructure.md`
|
||||
- Modify: `CLAUDE.md`
|
||||
- Modify: `README.md`
|
||||
- Modify: `docker/README.md`
|
||||
|
||||
**Step 1: Update HighLevelReqs.md**
|
||||
|
||||
Section 5 (Communication): Add gRPC streaming transport. ClusterClient for command/control, gRPC for real-time data.
|
||||
|
||||
**Step 2: Update Component-Communication.md**
|
||||
|
||||
Pattern 6: Replace ClusterClient streaming with gRPC. Add SiteStreamGrpcServer, SiteStreamGrpcClient, SiteStreamGrpcClientFactory. Add gRPC keepalive config.
|
||||
|
||||
**Step 3: Update remaining component docs**
|
||||
|
||||
Per the documentation update table in `docs/plans/grpc_streams.md` § Documentation Updates.
|
||||
|
||||
**Step 4: Update CLAUDE.md**
|
||||
|
||||
Add under Data & Communication: "gRPC streaming for site→central real-time data; ClusterClient for command/control only"
|
||||
|
||||
**Step 5: Update README.md architecture diagram**
|
||||
|
||||
Add gRPC streaming channel between site and central in the ASCII diagram.
|
||||
|
||||
**Step 6: Update docker/README.md**
|
||||
|
||||
Add gRPC ports to port allocation table.
|
||||
|
||||
**Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add docs/ CLAUDE.md README.md docker/README.md
|
||||
git commit -m "docs: update requirements and architecture for gRPC streaming channel"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 11: Final Guardrail Tests
|
||||
|
||||
**Files:**
|
||||
- Test: `tests/ScadaLink.Communication.Tests/Grpc/ProtoContractTests.cs`
|
||||
- Test: `tests/ScadaLink.Communication.Tests/Grpc/CleanupVerificationTests.cs`
|
||||
|
||||
**Step 1: Proto contract test**
|
||||
|
||||
Verify all `oneof` variants in `SiteStreamEvent` have corresponding handlers in `StreamRelayActor` and `ConvertToDomainEvent`. If a new proto field is added without handlers, the test fails.
|
||||
|
||||
**Step 2: Cleanup verification test**
|
||||
|
||||
Verify that after gRPC stream cancellation, `SiteStreamManager.SubscriptionCount` returns to zero (no leaked subscriptions).
|
||||
|
||||
**Step 3: No ClusterClient streaming regression test**
|
||||
|
||||
Integration test that subscribes via gRPC, triggers changes, and verifies events arrive via gRPC — NOT via `DebugStreamEvent`.
|
||||
|
||||
**Step 4: Run full test suite**
|
||||
|
||||
Run: `dotnet test tests/ScadaLink.Host.Tests/ && dotnet test tests/ScadaLink.Communication.Tests/ && dotnet test tests/ScadaLink.SiteRuntime.Tests/ && dotnet test tests/ScadaLink.Commons.Tests/ && dotnet test tests/ScadaLink.CLI.Tests/ && dotnet test tests/ScadaLink.ManagementService.Tests/`
|
||||
Expected: All pass, zero warnings
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/
|
||||
git commit -m "test: add proto contract, cleanup verification, and regression guardrail tests"
|
||||
```
|
||||
18
docs/plans/2026-03-21-grpc-streaming-channel.md.tasks.json
Normal file
18
docs/plans/2026-03-21-grpc-streaming-channel.md.tasks.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-03-21-grpc-streaming-channel.md",
|
||||
"tasks": [
|
||||
{"id": 0, "taskId": "1", "subject": "Task 0: Proto Definition & Stub Generation", "status": "pending"},
|
||||
{"id": 1, "taskId": "2", "subject": "Task 1: Site Config — GrpcPort in NodeOptions", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 2, "taskId": "3", "subject": "Task 2: Site Entity — gRPC Address Fields", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 3, "taskId": "4", "subject": "Task 3: Site-Side gRPC Server — StreamRelayActor", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 4, "taskId": "5", "subject": "Task 4: Site-Side gRPC Server — SiteStreamGrpcServer", "status": "pending", "blockedBy": [3]},
|
||||
{"id": 5, "taskId": "6", "subject": "Task 5: Switch Site Host to WebApplicationBuilder + gRPC", "status": "pending", "blockedBy": [4]},
|
||||
{"id": 6, "taskId": "7", "subject": "Task 6: Central-Side gRPC Client", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 7, "taskId": "8", "subject": "Task 7: Update DebugStreamBridgeActor to Use gRPC", "status": "pending", "blockedBy": [6, 5]},
|
||||
{"id": 8, "taskId": "9", "subject": "Task 8: Remove ClusterClient Streaming Path", "status": "pending", "blockedBy": [7]},
|
||||
{"id": 9, "taskId": "10", "subject": "Task 9: Docker & End-to-End Integration Test", "status": "pending", "blockedBy": [5, 1, 2]},
|
||||
{"id": 10, "taskId": "11", "subject": "Task 10: Documentation Updates", "status": "pending", "blockedBy": [9]},
|
||||
{"id": 11, "taskId": "12", "subject": "Task 11: Final Guardrail Tests", "status": "pending", "blockedBy": [9]}
|
||||
],
|
||||
"lastUpdated": "2026-03-21T14:15:00Z"
|
||||
}
|
||||
Reference in New Issue
Block a user