using System.Collections.Concurrent; using System.Threading.Channels; using Akka.Actor; using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using GrpcStatus = Grpc.Core.Status; namespace ScadaLink.Communication.Grpc; /// /// gRPC service that accepts instance stream subscriptions from central nodes. /// Creates a StreamRelayActor per subscription to bridge Akka domain events /// through a Channel<T> to the gRPC response stream. /// public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase { private readonly ISiteStreamSubscriber _streamSubscriber; private ActorSystem? _actorSystem; private readonly ILogger _logger; private readonly ConcurrentDictionary _activeStreams = new(); private readonly int _maxConcurrentStreams; private readonly TimeSpan _maxStreamLifetime; private volatile bool _ready; private long _actorCounter; public SiteStreamGrpcServer( ISiteStreamSubscriber streamSubscriber, ILogger logger, int maxConcurrentStreams = 100) : this(streamSubscriber, logger, maxConcurrentStreams, TimeSpan.FromHours(4)) { } /// /// DI constructor — binds /// and so the documented /// concurrency limit and the 4-hour zombie-stream session timeout are honoured /// rather than hard-coded. /// public SiteStreamGrpcServer( ISiteStreamSubscriber streamSubscriber, ILogger logger, IOptions options) : this(streamSubscriber, logger, options.Value.GrpcMaxConcurrentStreams, options.Value.GrpcMaxStreamLifetime) { } private SiteStreamGrpcServer( ISiteStreamSubscriber streamSubscriber, ILogger logger, int maxConcurrentStreams, TimeSpan maxStreamLifetime) { _streamSubscriber = streamSubscriber; _logger = logger; _maxConcurrentStreams = maxConcurrentStreams; _maxStreamLifetime = maxStreamLifetime; } /// /// Marks the server as ready to accept subscriptions and injects the ActorSystem. /// Called after the site runtime actor system is fully initialized. /// The ActorSystem is set here rather than via the constructor so that /// the gRPC server can be created by DI before the actor system exists. /// public void SetReady(ActorSystem actorSystem) { _actorSystem = actorSystem; _ready = true; } /// /// Number of currently active streaming subscriptions. Exposed for diagnostics. /// public int ActiveStreamCount => _activeStreams.Count; /// Effective max concurrent stream limit. Exposed for tests. internal int MaxConcurrentStreams => _maxConcurrentStreams; /// Effective per-stream session lifetime. Exposed for tests. internal TimeSpan MaxStreamLifetime => _maxStreamLifetime; public override async Task SubscribeInstance( InstanceStreamRequest request, IServerStreamWriter responseStream, ServerCallContext context) { if (!_ready) throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready")); // Duplicate prevention -- cancel existing stream for this correlationId if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry)) { existingEntry.Cts.Cancel(); existingEntry.Cts.Dispose(); } // Check max concurrent streams after duplicate removal if (_activeStreams.Count >= _maxConcurrentStreams) throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached")); using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken); // Session timeout (design doc "gRPC Connection Keepalive": 4-hour third layer // of dead-client detection) — forces a long-lived zombie stream to terminate // even if keepalive PINGs never detect the loss. if (_maxStreamLifetime > TimeSpan.Zero && _maxStreamLifetime != Timeout.InfiniteTimeSpan) streamCts.CancelAfter(_maxStreamLifetime); var entry = new StreamEntry(streamCts); _activeStreams[request.CorrelationId] = entry; var channel = Channel.CreateBounded( new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest }); var actorSeq = Interlocked.Increment(ref _actorCounter); var relayActor = _actorSystem!.ActorOf( Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer), $"stream-relay-{request.CorrelationId}-{actorSeq}"); var subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor); _logger.LogInformation( "Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})", request.CorrelationId, request.InstanceUniqueName, subscriptionId); try { await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token)) { await responseStream.WriteAsync(evt, streamCts.Token); } } catch (OperationCanceledException) { // Normal cancellation (client disconnect or duplicate replacement) } finally { _streamSubscriber.RemoveSubscriber(relayActor); _actorSystem!.Stop(relayActor); channel.Writer.TryComplete(); // Only remove our own entry -- a replacement stream may have already taken the slot _activeStreams.TryRemove( new KeyValuePair(request.CorrelationId, entry)); _logger.LogInformation( "Stream {CorrelationId} for {Instance} ended", request.CorrelationId, request.InstanceUniqueName); } } /// /// Tracks a single active stream so cleanup only removes its own entry. /// private sealed record StreamEntry(CancellationTokenSource Cts); }