diff --git a/src/ScadaLink.Communication/Grpc/ISiteStreamSubscriber.cs b/src/ScadaLink.Communication/Grpc/ISiteStreamSubscriber.cs new file mode 100644 index 0000000..438131c --- /dev/null +++ b/src/ScadaLink.Communication/Grpc/ISiteStreamSubscriber.cs @@ -0,0 +1,22 @@ +using Akka.Actor; + +namespace ScadaLink.Communication.Grpc; + +/// +/// Abstraction over the site-side stream subscription mechanism. +/// SiteStreamManager in the SiteRuntime project implements this interface; +/// the gRPC server depends on it without referencing SiteRuntime directly. +/// +public interface ISiteStreamSubscriber +{ + /// + /// Subscribes an actor to receive filtered stream events for a specific instance. + /// + /// A subscription ID that can be used for unsubscription. + string Subscribe(string instanceName, IActorRef subscriber); + + /// + /// Removes all subscriptions for the given actor. + /// + void RemoveSubscriber(IActorRef subscriber); +} diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs new file mode 100644 index 0000000..d2b4eac --- /dev/null +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs @@ -0,0 +1,116 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using Akka.Actor; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using GrpcStatus = Grpc.Core.Status; + +namespace ScadaLink.Communication.Grpc; + +/// +/// gRPC service that accepts instance stream subscriptions from central nodes. +/// Creates a StreamRelayActor per subscription to bridge Akka domain events +/// through a Channel<T> to the gRPC response stream. +/// +public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase +{ + private readonly ISiteStreamSubscriber _streamSubscriber; + private readonly ActorSystem _actorSystem; + private readonly ILogger _logger; + private readonly ConcurrentDictionary _activeStreams = new(); + private readonly int _maxConcurrentStreams; + private volatile bool _ready; + private long _actorCounter; + + public SiteStreamGrpcServer( + ISiteStreamSubscriber streamSubscriber, + ActorSystem actorSystem, + ILogger logger, + int maxConcurrentStreams = 100) + { + _streamSubscriber = streamSubscriber; + _actorSystem = actorSystem; + _logger = logger; + _maxConcurrentStreams = maxConcurrentStreams; + } + + /// + /// Marks the server as ready to accept subscriptions. + /// Called after the site runtime is fully initialized. + /// + public void SetReady() => _ready = true; + + /// + /// Number of currently active streaming subscriptions. Exposed for diagnostics. + /// + public int ActiveStreamCount => _activeStreams.Count; + + public override async Task SubscribeInstance( + InstanceStreamRequest request, + IServerStreamWriter responseStream, + ServerCallContext context) + { + if (!_ready) + throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready")); + + // Duplicate prevention -- cancel existing stream for this correlationId + if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry)) + { + existingEntry.Cts.Cancel(); + existingEntry.Cts.Dispose(); + } + + // Check max concurrent streams after duplicate removal + if (_activeStreams.Count >= _maxConcurrentStreams) + throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached")); + + using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken); + var entry = new StreamEntry(streamCts); + _activeStreams[request.CorrelationId] = entry; + + var channel = Channel.CreateBounded( + new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest }); + + var actorSeq = Interlocked.Increment(ref _actorCounter); + var relayActor = _actorSystem.ActorOf( + Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer), + $"stream-relay-{request.CorrelationId}-{actorSeq}"); + + var subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor); + + _logger.LogInformation( + "Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})", + request.CorrelationId, request.InstanceUniqueName, subscriptionId); + + try + { + await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token)) + { + await responseStream.WriteAsync(evt, streamCts.Token); + } + } + catch (OperationCanceledException) + { + // Normal cancellation (client disconnect or duplicate replacement) + } + finally + { + _streamSubscriber.RemoveSubscriber(relayActor); + _actorSystem.Stop(relayActor); + channel.Writer.TryComplete(); + + // Only remove our own entry -- a replacement stream may have already taken the slot + _activeStreams.TryRemove( + new KeyValuePair(request.CorrelationId, entry)); + + _logger.LogInformation( + "Stream {CorrelationId} for {Instance} ended", + request.CorrelationId, request.InstanceUniqueName); + } + } + + /// + /// Tracks a single active stream so cleanup only removes its own entry. + /// + private sealed record StreamEntry(CancellationTokenSource Cts); +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs new file mode 100644 index 0000000..5caae29 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs @@ -0,0 +1,237 @@ +using System.Threading.Channels; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +public class SiteStreamGrpcServerTests : TestKit +{ + private readonly ISiteStreamSubscriber _subscriber; + private readonly ILogger _logger; + + public SiteStreamGrpcServerTests() + { + _subscriber = Substitute.For(); + _subscriber.Subscribe(Arg.Any(), Arg.Any()) + .Returns("sub-1"); + _logger = NullLogger.Instance; + } + + private SiteStreamGrpcServer CreateServer(int maxStreams = 100) + { + return new SiteStreamGrpcServer(_subscriber, Sys, _logger, maxStreams); + } + + private static InstanceStreamRequest MakeRequest(string correlationId = "corr-1", string instance = "Site1.Pump01") + { + return new InstanceStreamRequest + { + CorrelationId = correlationId, + InstanceUniqueName = instance + }; + } + + [Fact] + public async Task RejectsWhenNotReady() + { + var server = CreateServer(); + // Do NOT call SetReady() + + var writer = Substitute.For>(); + var context = CreateMockContext(); + + var ex = await Assert.ThrowsAsync( + () => server.SubscribeInstance(MakeRequest(), writer, context)); + + Assert.Equal(StatusCode.Unavailable, ex.StatusCode); + } + + [Fact] + public async Task RejectsWhenMaxStreamsReached() + { + var server = CreateServer(maxStreams: 1); + server.SetReady(); + + // Start one stream that blocks + var cts1 = new CancellationTokenSource(); + var context1 = CreateMockContext(cts1.Token); + var writer1 = Substitute.For>(); + + var stream1Task = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-1"), writer1, context1)); + + // Wait for the first stream to register + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + // Second stream should be rejected + var writer2 = Substitute.For>(); + var context2 = CreateMockContext(); + + var ex = await Assert.ThrowsAsync( + () => server.SubscribeInstance(MakeRequest("corr-2"), writer2, context2)); + + Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode); + + // Clean up first stream + cts1.Cancel(); + await stream1Task; + } + + [Fact] + public async Task CancelsDuplicateCorrelationId() + { + var server = CreateServer(); + server.SetReady(); + + var cts1 = new CancellationTokenSource(); + var context1 = CreateMockContext(cts1.Token); + var writer1 = Substitute.For>(); + + // Start first stream + var stream1Task = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-dup"), writer1, context1)); + + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + // Start second stream with same correlationId -- should cancel first + var cts2 = new CancellationTokenSource(); + var context2 = CreateMockContext(cts2.Token); + var writer2 = Substitute.For>(); + + var stream2Task = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-dup"), writer2, context2)); + + // First stream should complete (cancelled by duplicate replacement) + await stream1Task; + + // Second stream should be active + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + // Clean up + cts2.Cancel(); + await stream2Task; + } + + [Fact] + public async Task CleansUpOnCancellation() + { + var server = CreateServer(); + server.SetReady(); + + var cts = new CancellationTokenSource(); + var context = CreateMockContext(cts.Token); + var writer = Substitute.For>(); + + var streamTask = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-cleanup"), writer, context)); + + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + cts.Cancel(); + await streamTask; + + Assert.Equal(0, server.ActiveStreamCount); + } + + [Fact] + public async Task SubscribesAndRemovesFromStreamManager() + { + var server = CreateServer(); + server.SetReady(); + + var cts = new CancellationTokenSource(); + var context = CreateMockContext(cts.Token); + var writer = Substitute.For>(); + + var streamTask = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-sub", "Site1.Motor01"), writer, context)); + + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + // Verify Subscribe was called + _subscriber.Received(1).Subscribe("Site1.Motor01", Arg.Any()); + + cts.Cancel(); + await streamTask; + + // Verify RemoveSubscriber was called + _subscriber.Received(1).RemoveSubscriber(Arg.Any()); + } + + [Fact] + public async Task WritesEventsToResponseStream() + { + var server = CreateServer(); + server.SetReady(); + + // Capture the relay actor so we can send it events + IActorRef? capturedActor = null; + _subscriber.Subscribe(Arg.Any(), Arg.Any()) + .Returns(ci => + { + capturedActor = ci.Arg(); + return "sub-write"; + }); + + var cts = new CancellationTokenSource(); + var context = CreateMockContext(cts.Token); + var writer = Substitute.For>(); + var writtenEvents = new List(); + writer.WriteAsync(Arg.Any(), Arg.Any()) + .Returns(Task.CompletedTask) + .AndDoes(ci => writtenEvents.Add(ci.Arg())); + + var streamTask = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-write", "Site1.Pump01"), writer, context)); + + await WaitForConditionAsync(() => capturedActor != null); + + // Send a domain event to the relay actor + var ts = DateTimeOffset.UtcNow; + capturedActor!.Tell(new Commons.Messages.Streaming.AttributeValueChanged( + "Site1.Pump01", "Path", "Attr", 99.5, "Good", ts)); + + // Wait for event to be written + await WaitForConditionAsync(() => writtenEvents.Count >= 1); + + Assert.Single(writtenEvents); + Assert.Equal("corr-write", writtenEvents[0].CorrelationId); + Assert.Equal(SiteStreamEvent.EventOneofCase.AttributeChanged, writtenEvents[0].EventCase); + + cts.Cancel(); + await streamTask; + } + + [Fact] + public void SetReady_AllowsStreamCreation() + { + var server = CreateServer(); + // Initially not ready -- just verify the property works + server.SetReady(); + // No assertion needed -- the other tests verify that SetReady enables streaming + Assert.Equal(0, server.ActiveStreamCount); + } + + private static ServerCallContext CreateMockContext(CancellationToken cancellationToken = default) + { + var context = Substitute.For(); + context.CancellationToken.Returns(cancellationToken); + return context; + } + + private static async Task WaitForConditionAsync(Func condition, int timeoutMs = 5000) + { + var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); + while (!condition() && DateTime.UtcNow < deadline) + { + await Task.Delay(25); + } + + Assert.True(condition(), $"Condition not met within {timeoutMs}ms"); + } +}