feat: add SiteStreamGrpcServer with Channel<T> bridge and stream limits

- Define ISiteStreamSubscriber interface for decoupling from SiteRuntime
- Implement SiteStreamGrpcServer (inherits SiteStreamServiceBase) with:
  - Readiness gate (SetReady)
  - Max concurrent stream enforcement
  - Duplicate correlationId replacement (cancels previous stream)
  - StreamRelayActor creation per subscription
  - Bounded Channel<SiteStreamEvent> bridge (1000 capacity, drop-oldest)
  - Clean teardown: unsubscribe, stop actor, remove tracking entry
- Identity-safe cleanup using ConcurrentDictionary.TryRemove(KeyValuePair)
  to prevent replacement streams from being removed by predecessor cleanup
- 7 unit tests covering reject-not-ready, max-streams, duplicate cancel,
  cleanup-on-cancel, subscribe/remove lifecycle, event forwarding
This commit is contained in:
Joseph Doherty
2026-03-21 11:52:31 -04:00
parent d70bbbe739
commit 55a05914d0
3 changed files with 375 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
using Akka.Actor;
namespace ScadaLink.Communication.Grpc;
/// <summary>
/// Abstraction over the site-side stream subscription mechanism.
/// SiteStreamManager in the SiteRuntime project implements this interface;
/// the gRPC server depends on it without referencing SiteRuntime directly.
/// </summary>
public interface ISiteStreamSubscriber
{
/// <summary>
/// Subscribes an actor to receive filtered stream events for a specific instance.
/// </summary>
/// <returns>A subscription ID that can be used for unsubscription.</returns>
string Subscribe(string instanceName, IActorRef subscriber);
/// <summary>
/// Removes all subscriptions for the given actor.
/// </summary>
void RemoveSubscriber(IActorRef subscriber);
}

View File

@@ -0,0 +1,116 @@
using System.Collections.Concurrent;
using System.Threading.Channels;
using Akka.Actor;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using GrpcStatus = Grpc.Core.Status;
namespace ScadaLink.Communication.Grpc;
/// <summary>
/// gRPC service that accepts instance stream subscriptions from central nodes.
/// Creates a StreamRelayActor per subscription to bridge Akka domain events
/// through a Channel&lt;T&gt; to the gRPC response stream.
/// </summary>
public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
{
private readonly ISiteStreamSubscriber _streamSubscriber;
private readonly ActorSystem _actorSystem;
private readonly ILogger<SiteStreamGrpcServer> _logger;
private readonly ConcurrentDictionary<string, StreamEntry> _activeStreams = new();
private readonly int _maxConcurrentStreams;
private volatile bool _ready;
private long _actorCounter;
public SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ActorSystem actorSystem,
ILogger<SiteStreamGrpcServer> logger,
int maxConcurrentStreams = 100)
{
_streamSubscriber = streamSubscriber;
_actorSystem = actorSystem;
_logger = logger;
_maxConcurrentStreams = maxConcurrentStreams;
}
/// <summary>
/// Marks the server as ready to accept subscriptions.
/// Called after the site runtime is fully initialized.
/// </summary>
public void SetReady() => _ready = true;
/// <summary>
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
/// </summary>
public int ActiveStreamCount => _activeStreams.Count;
public override async Task SubscribeInstance(
InstanceStreamRequest request,
IServerStreamWriter<SiteStreamEvent> responseStream,
ServerCallContext context)
{
if (!_ready)
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
// Duplicate prevention -- cancel existing stream for this correlationId
if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry))
{
existingEntry.Cts.Cancel();
existingEntry.Cts.Dispose();
}
// Check max concurrent streams after duplicate removal
if (_activeStreams.Count >= _maxConcurrentStreams)
throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached"));
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
var entry = new StreamEntry(streamCts);
_activeStreams[request.CorrelationId] = entry;
var channel = Channel.CreateBounded<SiteStreamEvent>(
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest });
var actorSeq = Interlocked.Increment(ref _actorCounter);
var relayActor = _actorSystem.ActorOf(
Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer),
$"stream-relay-{request.CorrelationId}-{actorSeq}");
var subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor);
_logger.LogInformation(
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
{
await responseStream.WriteAsync(evt, streamCts.Token);
}
}
catch (OperationCanceledException)
{
// Normal cancellation (client disconnect or duplicate replacement)
}
finally
{
_streamSubscriber.RemoveSubscriber(relayActor);
_actorSystem.Stop(relayActor);
channel.Writer.TryComplete();
// Only remove our own entry -- a replacement stream may have already taken the slot
_activeStreams.TryRemove(
new KeyValuePair<string, StreamEntry>(request.CorrelationId, entry));
_logger.LogInformation(
"Stream {CorrelationId} for {Instance} ended",
request.CorrelationId, request.InstanceUniqueName);
}
}
/// <summary>
/// Tracks a single active stream so cleanup only removes its own entry.
/// </summary>
private sealed record StreamEntry(CancellationTokenSource Cts);
}

View File

@@ -0,0 +1,237 @@
using System.Threading.Channels;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
public class SiteStreamGrpcServerTests : TestKit
{
private readonly ISiteStreamSubscriber _subscriber;
private readonly ILogger<SiteStreamGrpcServer> _logger;
public SiteStreamGrpcServerTests()
{
_subscriber = Substitute.For<ISiteStreamSubscriber>();
_subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
.Returns("sub-1");
_logger = NullLogger<SiteStreamGrpcServer>.Instance;
}
private SiteStreamGrpcServer CreateServer(int maxStreams = 100)
{
return new SiteStreamGrpcServer(_subscriber, Sys, _logger, maxStreams);
}
private static InstanceStreamRequest MakeRequest(string correlationId = "corr-1", string instance = "Site1.Pump01")
{
return new InstanceStreamRequest
{
CorrelationId = correlationId,
InstanceUniqueName = instance
};
}
[Fact]
public async Task RejectsWhenNotReady()
{
var server = CreateServer();
// Do NOT call SetReady()
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var context = CreateMockContext();
var ex = await Assert.ThrowsAsync<RpcException>(
() => server.SubscribeInstance(MakeRequest(), writer, context));
Assert.Equal(StatusCode.Unavailable, ex.StatusCode);
}
[Fact]
public async Task RejectsWhenMaxStreamsReached()
{
var server = CreateServer(maxStreams: 1);
server.SetReady();
// Start one stream that blocks
var cts1 = new CancellationTokenSource();
var context1 = CreateMockContext(cts1.Token);
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var stream1Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-1"), writer1, context1));
// Wait for the first stream to register
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Second stream should be rejected
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var context2 = CreateMockContext();
var ex = await Assert.ThrowsAsync<RpcException>(
() => server.SubscribeInstance(MakeRequest("corr-2"), writer2, context2));
Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode);
// Clean up first stream
cts1.Cancel();
await stream1Task;
}
[Fact]
public async Task CancelsDuplicateCorrelationId()
{
var server = CreateServer();
server.SetReady();
var cts1 = new CancellationTokenSource();
var context1 = CreateMockContext(cts1.Token);
var writer1 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
// Start first stream
var stream1Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-dup"), writer1, context1));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Start second stream with same correlationId -- should cancel first
var cts2 = new CancellationTokenSource();
var context2 = CreateMockContext(cts2.Token);
var writer2 = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var stream2Task = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-dup"), writer2, context2));
// First stream should complete (cancelled by duplicate replacement)
await stream1Task;
// Second stream should be active
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Clean up
cts2.Cancel();
await stream2Task;
}
[Fact]
public async Task CleansUpOnCancellation()
{
var server = CreateServer();
server.SetReady();
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-cleanup"), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
cts.Cancel();
await streamTask;
Assert.Equal(0, server.ActiveStreamCount);
}
[Fact]
public async Task SubscribesAndRemovesFromStreamManager()
{
var server = CreateServer();
server.SetReady();
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-sub", "Site1.Motor01"), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// Verify Subscribe was called
_subscriber.Received(1).Subscribe("Site1.Motor01", Arg.Any<IActorRef>());
cts.Cancel();
await streamTask;
// Verify RemoveSubscriber was called
_subscriber.Received(1).RemoveSubscriber(Arg.Any<IActorRef>());
}
[Fact]
public async Task WritesEventsToResponseStream()
{
var server = CreateServer();
server.SetReady();
// Capture the relay actor so we can send it events
IActorRef? capturedActor = null;
_subscriber.Subscribe(Arg.Any<string>(), Arg.Any<IActorRef>())
.Returns(ci =>
{
capturedActor = ci.Arg<IActorRef>();
return "sub-write";
});
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var writtenEvents = new List<SiteStreamEvent>();
writer.WriteAsync(Arg.Any<SiteStreamEvent>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask)
.AndDoes(ci => writtenEvents.Add(ci.Arg<SiteStreamEvent>()));
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-write", "Site1.Pump01"), writer, context));
await WaitForConditionAsync(() => capturedActor != null);
// Send a domain event to the relay actor
var ts = DateTimeOffset.UtcNow;
capturedActor!.Tell(new Commons.Messages.Streaming.AttributeValueChanged(
"Site1.Pump01", "Path", "Attr", 99.5, "Good", ts));
// Wait for event to be written
await WaitForConditionAsync(() => writtenEvents.Count >= 1);
Assert.Single(writtenEvents);
Assert.Equal("corr-write", writtenEvents[0].CorrelationId);
Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[0].EventCase);
cts.Cancel();
await streamTask;
}
[Fact]
public void SetReady_AllowsStreamCreation()
{
var server = CreateServer();
// Initially not ready -- just verify the property works
server.SetReady();
// No assertion needed -- the other tests verify that SetReady enables streaming
Assert.Equal(0, server.ActiveStreamCount);
}
private static ServerCallContext CreateMockContext(CancellationToken cancellationToken = default)
{
var context = Substitute.For<ServerCallContext>();
context.CancellationToken.Returns(cancellationToken);
return context;
}
private static async Task WaitForConditionAsync(Func<bool> condition, int timeoutMs = 5000)
{
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
while (!condition() && DateTime.UtcNow < deadline)
{
await Task.Delay(25);
}
Assert.True(condition(), $"Condition not met within {timeoutMs}ms");
}
}