22 KiB
gRPC Streaming Channel Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
Goal: Replace ClusterClient-based debug event streaming with a dedicated gRPC server-streaming channel from site nodes to central, following the design in docs/plans/grpc_streams.md.
Architecture: Each site node runs a gRPC server (SiteStreamGrpcServer) on a dedicated HTTP/2 port. Central creates per-site gRPC clients (SiteStreamGrpcClient) that open server-streaming subscriptions filtered by instance name. Events flow: SiteStreamManager → relay actor → Channel<T> → gRPC stream → central callback → DebugStreamBridgeActor → SignalR/Blazor. ClusterClient continues handling command/control (subscribe/unsubscribe/snapshot).
Tech Stack: .NET 10, gRPC (Grpc.AspNetCore + Grpc.Net.Client), Protocol Buffers, Akka.NET, ASP.NET Core Kestrel, System.Threading.Channels
Design Reference: docs/plans/grpc_streams.md — full architecture, proto definition, failover, keepalive, backpressure, and review notes.
Task 0: Proto Definition & Stub Generation
Files:
- Create:
src/ScadaLink.Communication/Protos/sitestream.proto - Create:
src/ScadaLink.Communication/SiteStreamGrpc/(generated stubs) - Modify:
src/ScadaLink.Communication/ScadaLink.Communication.csproj
Step 1: Create the proto file
Create src/ScadaLink.Communication/Protos/sitestream.proto with the proto definition from docs/plans/grpc_streams.md "Proto Improvements" section (V1 review notes version with enums and google.protobuf.Timestamp):
syntax = "proto3";
option csharp_namespace = "ScadaLink.Communication.Grpc";
package sitestream;
import "google/protobuf/timestamp.proto";
service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
}
message InstanceStreamRequest {
string correlation_id = 1;
string instance_unique_name = 2;
}
message SiteStreamEvent {
string correlation_id = 1;
oneof event {
AttributeValueUpdate attribute_changed = 2;
AlarmStateUpdate alarm_changed = 3;
}
}
enum Quality {
QUALITY_UNSPECIFIED = 0;
QUALITY_GOOD = 1;
QUALITY_UNCERTAIN = 2;
QUALITY_BAD = 3;
}
enum AlarmStateEnum {
ALARM_STATE_UNSPECIFIED = 0;
ALARM_STATE_NORMAL = 1;
ALARM_STATE_ACTIVE = 2;
}
message AttributeValueUpdate {
string instance_unique_name = 1;
string attribute_path = 2;
string attribute_name = 3;
string value = 4;
Quality quality = 5;
google.protobuf.Timestamp timestamp = 6;
}
message AlarmStateUpdate {
string instance_unique_name = 1;
string alarm_name = 2;
AlarmStateEnum state = 3;
int32 priority = 4;
google.protobuf.Timestamp timestamp = 5;
}
Step 2: Add gRPC NuGet packages
Add to src/ScadaLink.Communication/ScadaLink.Communication.csproj:
<PackageReference Include="Grpc.AspNetCore" Version="2.71.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.71.0" />
<PackageReference Include="Google.Protobuf" Version="3.29.3" />
<PackageReference Include="Grpc.Tools" Version="2.71.0" PrivateAssets="All" />
Also add <FrameworkReference Include="Microsoft.AspNetCore.App" /> if not already present (needed for Grpc.AspNetCore).
Step 3: Generate C# stubs
Run protoc locally to generate stubs. Check generated files into src/ScadaLink.Communication/SiteStreamGrpc/. Follow the same pattern as src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyGrpc/ — pre-generated, no protoc at build time.
Step 4: Verify build
Run: dotnet build src/ScadaLink.Communication/
Expected: Build succeeded, 0 errors
Step 5: Write proto roundtrip tests
Create tests/ScadaLink.Communication.Tests/Grpc/ProtoRoundtripTests.cs:
- Test
AttributeValueUpdateserialization/deserialization with all Quality enum values - Test
AlarmStateUpdateserialization/deserialization with all AlarmStateEnum values - Test
SiteStreamEventoneof discrimination (attribute vs alarm) - Test
google.protobuf.Timestampconversion to/fromDateTimeOffset
Step 6: Run tests
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: All pass including new proto tests
Step 7: Commit
git add src/ScadaLink.Communication/Protos/ src/ScadaLink.Communication/SiteStreamGrpc/ src/ScadaLink.Communication/ScadaLink.Communication.csproj tests/ScadaLink.Communication.Tests/
git commit -m "feat: add sitestream.proto definition and generated gRPC stubs"
Task 1: Site Config — GrpcPort in NodeOptions
Files:
- Modify:
src/ScadaLink.Host/NodeOptions.cs:8 - Modify:
src/ScadaLink.Host/StartupValidator.cs:43-48 - Modify:
src/ScadaLink.Host/appsettings.Site.json:7 - Test:
tests/ScadaLink.Host.Tests/
Step 1: Write failing test for GrpcPort validation
Add to existing startup validator tests: test that a site node with GrpcPort outside 1-65535 fails validation, and that a valid GrpcPort passes.
Step 2: Run test to verify it fails
Run: dotnet test tests/ScadaLink.Host.Tests/
Expected: New test FAILS (GrpcPort not validated yet)
Step 3: Add GrpcPort to NodeOptions
In src/ScadaLink.Host/NodeOptions.cs, add:
public int GrpcPort { get; set; } = 8083;
Step 4: Add validation in StartupValidator
In src/ScadaLink.Host/StartupValidator.cs, after the existing site validation block (line ~43):
if (role == "Site")
{
var grpcPortStr = nodeSection["GrpcPort"];
if (grpcPortStr != null && (!int.TryParse(grpcPortStr, out var gp) || gp < 1 || gp > 65535))
errors.Add("ScadaLink:Node:GrpcPort must be 1-65535");
}
Step 5: Add GrpcPort to appsettings.Site.json
In src/ScadaLink.Host/appsettings.Site.json, add after "RemotingPort": 8082:
"GrpcPort": 8083
Step 6: Run tests
Run: dotnet test tests/ScadaLink.Host.Tests/
Expected: All pass
Step 7: Commit
git add src/ScadaLink.Host/ tests/ScadaLink.Host.Tests/
git commit -m "feat: add GrpcPort config to NodeOptions with startup validation"
Task 2: Site Entity — gRPC Address Fields
Files:
- Modify:
src/ScadaLink.Commons/Entities/Sites/Site.cs:9-10 - Modify:
src/ScadaLink.Commons/Messages/Management/SiteCommands.cs:5-6 - Modify:
src/ScadaLink.ConfigurationDatabase/(migration) - Modify:
src/ScadaLink.ManagementService/ManagementActor.cs(handlers) - Modify:
src/ScadaLink.CLI/Commands/SiteCommands.cs - Modify:
src/ScadaLink.CentralUI/Components/Pages/Admin/Sites.razor - Test:
tests/ScadaLink.Commons.Tests/
Step 1: Add fields to Site entity
In src/ScadaLink.Commons/Entities/Sites/Site.cs, add after NodeBAddress:
public string? GrpcNodeAAddress { get; set; }
public string? GrpcNodeBAddress { get; set; }
Step 2: Update management commands
In src/ScadaLink.Commons/Messages/Management/SiteCommands.cs, add GrpcNodeAAddress and GrpcNodeBAddress optional params to CreateSiteCommand and UpdateSiteCommand.
Step 3: Add EF Core migration
Run: dotnet ef migrations add AddGrpcNodeAddresses --project src/ScadaLink.ConfigurationDatabase/ --startup-project src/ScadaLink.Host/
Or create manual migration adding nullable GrpcNodeAAddress and GrpcNodeBAddress string columns to Sites table.
Step 4: Update ManagementActor handlers
In src/ScadaLink.ManagementService/ManagementActor.cs, update HandleCreateSite and HandleUpdateSite to pass gRPC addresses to the repository.
Step 5: Update CLI SiteCommands
In src/ScadaLink.CLI/Commands/SiteCommands.cs, add --grpc-node-a-address and --grpc-node-b-address options to site create and site update commands.
Step 6: Update Central UI Sites.razor
In src/ScadaLink.CentralUI/Components/Pages/Admin/Sites.razor:
- Add
_formGrpcNodeAAddressand_formGrpcNodeBAddressform fields - Add table columns for gRPC addresses
- Wire into create/update handlers
Step 7: Run tests
Run: dotnet test tests/ScadaLink.Commons.Tests/ && dotnet test tests/ScadaLink.CLI.Tests/ && dotnet test tests/ScadaLink.Host.Tests/
Expected: All pass
Step 8: Commit
git add src/ScadaLink.Commons/ src/ScadaLink.ConfigurationDatabase/ src/ScadaLink.ManagementService/ src/ScadaLink.CLI/ src/ScadaLink.CentralUI/
git commit -m "feat: add GrpcNodeAAddress/GrpcNodeBAddress to Site entity, CLI, and UI"
Task 3: Site-Side gRPC Server — StreamRelayActor
Files:
- Create:
src/ScadaLink.Communication/Grpc/StreamRelayActor.cs - Test:
tests/ScadaLink.Communication.Tests/Grpc/StreamRelayActorTests.cs
Step 1: Write failing test
Test that StreamRelayActor receives AttributeValueChanged and writes a correctly-converted SiteStreamEvent proto message to a ChannelWriter<SiteStreamEvent>. Use Akka.TestKit.
Step 2: Run test to verify it fails
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: FAIL (class doesn't exist)
Step 3: Implement StreamRelayActor
Create src/ScadaLink.Communication/Grpc/StreamRelayActor.cs:
ReceiveActorthat receivesAttributeValueChangedandAlarmStateChanged- Converts each to the proto
SiteStreamEventwith correct enum mappings andTimestampconversion - Writes to
ChannelWriter<SiteStreamEvent>viaTryWrite - Logs dropped events when channel is full
Step 4: Run tests
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: All pass
Step 5: Commit
git add src/ScadaLink.Communication/Grpc/ tests/ScadaLink.Communication.Tests/
git commit -m "feat: add StreamRelayActor bridging Akka events to gRPC proto channel"
Task 4: Site-Side gRPC Server — SiteStreamGrpcServer
Files:
- Create:
src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs - Test:
tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs
Step 1: Write failing tests
- Server accepts subscription, relays events from mock SiteStreamManager to gRPC stream
- Server cleans up SiteStreamManager subscription on cancellation
- Server rejects duplicate
correlation_id(cancels old stream) - Server enforces max concurrent streams (100), rejects with
ResourceExhausted - Server rejects with
Unavailablebefore actor system is ready
Step 2: Run tests to verify they fail
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: FAIL
Step 3: Implement SiteStreamGrpcServer
Create src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs:
- Inherits
SiteStreamService.SiteStreamServiceBase - Injects
SiteStreamManager(or interface),ActorSystem - Tracks active streams in
ConcurrentDictionary<string, CancellationTokenSource> SubscribeInstance: createsChannel<SiteStreamEvent>, createsStreamRelayActor, subscribes to SiteStreamManager, reads channel → writes to gRPC response streamfinally: removes subscription, stops relay actor, removes from active streams- Readiness gate: checks
ActorSystemavailability before accepting
Step 4: Run tests
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: All pass
Step 5: Commit
git add src/ScadaLink.Communication/Grpc/ tests/ScadaLink.Communication.Tests/
git commit -m "feat: add SiteStreamGrpcServer with Channel<T> bridge and stream limits"
Task 5: Switch Site Host to WebApplicationBuilder + gRPC
Files:
- Modify:
src/ScadaLink.Host/Program.cs:157-174 - Modify:
src/ScadaLink.Host/appsettings.Site.json - Modify:
docker/docker-compose.yml - Test:
tests/ScadaLink.Host.Tests/
Step 1: Write failing test
Site host startup test: verify WebApplicationBuilder starts, gRPC port is configured, MapGrpcService is registered.
Step 2: Switch site host from generic Host to WebApplicationBuilder
In src/ScadaLink.Host/Program.cs, replace the Host.CreateDefaultBuilder() site section with WebApplication.CreateBuilder() + Kestrel HTTP/2 on GrpcPort + AddGrpc() + MapGrpcService<SiteStreamGrpcServer>(). Keep all existing service registrations via SiteServiceRegistration.Configure().
Add gRPC keepalive settings from CommunicationOptions:
KeepAlivePingDelay = 15sKeepAlivePingTimeout = 10s
Step 3: Update docker-compose.yml
Expose gRPC port 8083 for each site node:
- Site-A:
9023:8083/9024:8083 - Site-B:
9033:8083/9034:8083 - Site-C:
9043:8083/9044:8083
Step 4: Add gRPC keepalive config to CommunicationOptions
Add to src/ScadaLink.Communication/CommunicationOptions.cs:
public TimeSpan GrpcKeepAlivePingDelay { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan GrpcKeepAlivePingTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan GrpcMaxStreamLifetime { get; set; } = TimeSpan.FromHours(4);
public int GrpcMaxConcurrentStreams { get; set; } = 100;
Step 5: Run tests and build
Run: dotnet build src/ScadaLink.Host/ && dotnet test tests/ScadaLink.Host.Tests/
Expected: All pass
Step 6: Commit
git add src/ScadaLink.Host/ src/ScadaLink.Communication/ docker/
git commit -m "feat: switch site host to WebApplicationBuilder with Kestrel gRPC server"
Task 6: Central-Side gRPC Client
Files:
- Create:
src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs - Create:
src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs - Test:
tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs - Test:
tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs
Step 1: Write failing tests for SiteStreamGrpcClient
- Client connects, reads stream, converts proto→domain types (
AttributeValueChanged,AlarmStateChanged), invokes callback - Client handles stream errors (throws on
RpcException) - Client cancellation stops the background reader
Step 2: Implement SiteStreamGrpcClient
- Creates
GrpcChannelwith keepalive settings fromCommunicationOptions SubscribeAsync: callsSiteStreamService.SubscribeInstance(), launches background task to readResponseStream, converts proto→domain, invokes callbackUnsubscribe: cancels theCancellationTokenSourcefor the subscriptionIAsyncDisposable: disposes channel
Step 3: Write failing tests for SiteStreamGrpcClientFactory
- Creates and caches per-site clients
- Falls back to NodeB on NodeA connection failure
- Disposes clients on site removal
Step 4: Implement SiteStreamGrpcClientFactory
GetOrCreateAsync(siteIdentifier, grpcNodeAAddress, grpcNodeBAddress)→SiteStreamGrpcClient- Caches by
siteIdentifierinConcurrentDictionary - Manages
GrpcChannellifecycle
Step 5: Run tests
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: All pass
Step 6: Commit
git add src/ScadaLink.Communication/Grpc/ tests/ScadaLink.Communication.Tests/
git commit -m "feat: add SiteStreamGrpcClient and SiteStreamGrpcClientFactory"
Task 7: Update DebugStreamBridgeActor to Use gRPC
Files:
- Modify:
src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs - Modify:
src/ScadaLink.Communication/DebugStreamService.cs - Modify:
src/ScadaLink.Communication/ServiceCollectionExtensions.cs - Test:
tests/ScadaLink.Communication.Tests/
Step 1: Write failing tests for updated bridge actor
- Bridge actor sends subscribe via ClusterClient, receives snapshot
- After snapshot, opens gRPC stream via
SiteStreamGrpcClient - Events from gRPC callback forwarded to
_onEvent - On gRPC stream error: reconnects to other node with backoff (max 3 retries)
- On stop: cancels gRPC + sends unsubscribe via ClusterClient
- Handles
DebugStreamTerminatedidempotently
Step 2: Update DebugStreamBridgeActor
Rewrite to:
PreStart: sendSubscribeDebugViewRequestvia ClusterClient (unchanged)- On
DebugViewSnapshotreceived: open gRPC stream first (per handoff race mitigation — stream first, then apply snapshot) - gRPC callback delivers events to
SelfviaTell(marshals onto actor thread) - On gRPC error: enter reconnecting state, try other node, backoff, max retries
- On stop: cancel gRPC subscription + send
UnsubscribeDebugViewRequest
Step 3: Update DebugStreamService
Inject SiteStreamGrpcClientFactory. Resolve GrpcNodeAAddress/GrpcNodeBAddress from Site entity. Pass to bridge actor.
Step 4: Register factory in DI
In src/ScadaLink.Communication/ServiceCollectionExtensions.cs:
services.AddSingleton<SiteStreamGrpcClientFactory>();
Step 5: Run tests
Run: dotnet test tests/ScadaLink.Communication.Tests/
Expected: All pass
Step 6: Commit
git add src/ScadaLink.Communication/ tests/ScadaLink.Communication.Tests/
git commit -m "feat: update DebugStreamBridgeActor to use gRPC for streaming events"
Task 8: Remove ClusterClient Streaming Path
Files:
- Modify:
src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs - Modify:
src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs - Modify:
src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs - Delete:
src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs - Test:
tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs - Test:
tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs
Step 1: Remove DebugStreamEvent from InstanceActor
Remove _debugSubscriberCorrelationIds, _siteCommActor, and all DebugStreamEvent forwarding from PublishAndNotifyChildren and HandleAlarmStateChanged. InstanceActor just publishes to SiteStreamManager — the gRPC server picks up events from there.
Keep HandleSubscribeDebugView (for snapshot) and HandleUnsubscribeDebugView.
Step 2: Remove DebugStreamEvent from SiteCommunicationActor
Remove Receive<DebugStreamEvent> handler.
Step 3: Remove DebugStreamEvent from CentralCommunicationActor
Remove Receive<DebugStreamEvent> handler and HandleDebugStreamEvent method.
Step 4: Delete DebugStreamEvent.cs
Delete src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs.
Step 5: Update InstanceActorIntegrationTests
Remove DebugStreamEventForwarder test helper. Update debug subscriber tests to verify events reach SiteStreamManager only.
Step 6: Add architectural constraint test
In tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs, add test verifying DebugStreamEvent type no longer exists in the Commons assembly.
Step 7: Run full test suite
Run: dotnet test tests/ScadaLink.SiteRuntime.Tests/ && dotnet test tests/ScadaLink.Communication.Tests/ && dotnet test tests/ScadaLink.Commons.Tests/ && dotnet test tests/ScadaLink.Host.Tests/
Expected: All pass
Step 8: Commit
git add -A
git commit -m "refactor: remove ClusterClient streaming path (DebugStreamEvent), events flow via gRPC"
Task 9: Docker & End-to-End Integration Test
Files:
- Modify:
docker/docker-compose.yml - Modify:
docker/deploy.sh(if needed) - Create:
tests/ScadaLink.IntegrationTests/Grpc/GrpcStreamIntegrationTests.cs
Step 1: Update docker-compose site appsettings
Ensure site container configs include GrpcPort: 8083 and gRPC ports are exposed.
Step 2: Write integration test
End-to-end: start in-process site gRPC server → central gRPC client → verify event delivery, cancellation cleanup.
Step 3: Build and deploy cluster
Run: bash docker/deploy.sh
Expected: All containers start, gRPC ports exposed
Step 4: Manual end-to-end verification
Run: timeout 35 dotnet run --project src/ScadaLink.CLI -- --url http://localhost:9000 --username multi-role --password password debug stream --id 1 --format table
Expected: Initial snapshot + streaming ATTR/ALARM rows via gRPC (not ClusterClient).
Write an OPC UA tag to verify:
python3 infra/tools/opcua_tool.py write --node "ns=3;s=JoeAppEngine.BTCS" --value "gRPC streaming test" --type String
Step 5: Commit
git add tests/ScadaLink.IntegrationTests/ docker/
git commit -m "test: add gRPC stream integration test and docker config"
Task 10: Documentation Updates
Files:
- Modify:
docs/requirements/HighLevelReqs.md - Modify:
docs/requirements/Component-Communication.md - Modify:
docs/requirements/Component-SiteRuntime.md - Modify:
docs/requirements/Component-Host.md - Modify:
docs/requirements/Component-CentralUI.md - Modify:
docs/requirements/Component-CLI.md - Modify:
docs/requirements/Component-ConfigurationDatabase.md - Modify:
docs/requirements/Component-ClusterInfrastructure.md - Modify:
CLAUDE.md - Modify:
README.md - Modify:
docker/README.md
Step 1: Update HighLevelReqs.md
Section 5 (Communication): Add gRPC streaming transport. ClusterClient for command/control, gRPC for real-time data.
Step 2: Update Component-Communication.md
Pattern 6: Replace ClusterClient streaming with gRPC. Add SiteStreamGrpcServer, SiteStreamGrpcClient, SiteStreamGrpcClientFactory. Add gRPC keepalive config.
Step 3: Update remaining component docs
Per the documentation update table in docs/plans/grpc_streams.md § Documentation Updates.
Step 4: Update CLAUDE.md
Add under Data & Communication: "gRPC streaming for site→central real-time data; ClusterClient for command/control only"
Step 5: Update README.md architecture diagram
Add gRPC streaming channel between site and central in the ASCII diagram.
Step 6: Update docker/README.md
Add gRPC ports to port allocation table.
Step 7: Commit
git add docs/ CLAUDE.md README.md docker/README.md
git commit -m "docs: update requirements and architecture for gRPC streaming channel"
Task 11: Final Guardrail Tests
Files:
- Test:
tests/ScadaLink.Communication.Tests/Grpc/ProtoContractTests.cs - Test:
tests/ScadaLink.Communication.Tests/Grpc/CleanupVerificationTests.cs
Step 1: Proto contract test
Verify all oneof variants in SiteStreamEvent have corresponding handlers in StreamRelayActor and ConvertToDomainEvent. If a new proto field is added without handlers, the test fails.
Step 2: Cleanup verification test
Verify that after gRPC stream cancellation, SiteStreamManager.SubscriptionCount returns to zero (no leaked subscriptions).
Step 3: No ClusterClient streaming regression test
Integration test that subscribes via gRPC, triggers changes, and verifies events arrive via gRPC — NOT via DebugStreamEvent.
Step 4: Run full test suite
Run: dotnet test tests/ScadaLink.Host.Tests/ && dotnet test tests/ScadaLink.Communication.Tests/ && dotnet test tests/ScadaLink.SiteRuntime.Tests/ && dotnet test tests/ScadaLink.Commons.Tests/ && dotnet test tests/ScadaLink.CLI.Tests/ && dotnet test tests/ScadaLink.ManagementService.Tests/
Expected: All pass, zero warnings
Step 5: Commit
git add tests/
git commit -m "test: add proto contract, cleanup verification, and regression guardrail tests"