using System.Collections.Concurrent; using Grpc.Core; using Grpc.Net.Client; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Commons.Types.Enums; using Google.Protobuf.WellKnownTypes; namespace ScadaLink.Communication.Grpc; /// /// Per-site gRPC client that manages streaming subscriptions to a site's /// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this /// to open server-streaming calls for individual instances. /// public class SiteStreamGrpcClient : IAsyncDisposable, IDisposable { private readonly GrpcChannel? _channel; private readonly SiteStreamService.SiteStreamServiceClient? _client; private readonly ILogger? _logger; private readonly ConcurrentDictionary _subscriptions = new(); /// /// The gRPC endpoint (site node address) this client is bound to. The /// compares this against the requested /// endpoint so a NodeA→NodeB failover flip (or a site address edit) is honoured /// rather than served stale from cache. /// public virtual string Endpoint { get; } = string.Empty; /// /// The HTTP/2 keepalive ping delay actually applied to this client's channel. /// Exposed for tests verifying that is honoured. /// internal TimeSpan KeepAlivePingDelay { get; } /// /// The HTTP/2 keepalive ping timeout actually applied to this client's channel. /// Exposed for tests verifying that is honoured. /// internal TimeSpan KeepAlivePingTimeout { get; } public SiteStreamGrpcClient(string endpoint, ILogger logger) : this(endpoint, logger, new CommunicationOptions()) { } /// /// Creates a client whose HTTP/2 keepalive is taken from /// rather than hard-coded, satisfying the design doc's "gRPC Connection Keepalive" /// section which states these values are configurable. /// public SiteStreamGrpcClient(string endpoint, ILogger logger, CommunicationOptions options) { Endpoint = endpoint; KeepAlivePingDelay = options.GrpcKeepAlivePingDelay; KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout; _channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions { HttpHandler = new SocketsHttpHandler { KeepAlivePingDelay = options.GrpcKeepAlivePingDelay, KeepAlivePingTimeout = options.GrpcKeepAlivePingTimeout, KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always } }); _client = new SiteStreamService.SiteStreamServiceClient(_channel); _logger = logger; } /// /// Protected constructor for unit testing without a real gRPC channel. /// Allows subclassing for mock implementations. /// protected SiteStreamGrpcClient() { } /// /// Protected constructor for unit testing — records the endpoint without /// opening a real gRPC channel, so endpoint-aware factory behaviour can be /// exercised by test doubles. /// protected SiteStreamGrpcClient(string endpoint) { Endpoint = endpoint; } /// /// Creates a test-only instance that has no gRPC channel. Used to test /// Unsubscribe and Dispose behavior without needing a real endpoint. /// internal static SiteStreamGrpcClient CreateForTesting() => new(); /// /// Registers a CancellationTokenSource for a correlation ID. Test-only. /// internal void AddSubscriptionForTesting(string correlationId, CancellationTokenSource cts) { _subscriptions[correlationId] = cts; } /// /// Registers a subscription's CancellationTokenSource for a correlation ID. /// If an entry already exists for that correlation ID (a reconnect race where two /// calls briefly share an ID), the prior CTS is /// cancelled and disposed so it cannot leak. Internal for testability. /// internal void RegisterSubscription(string correlationId, CancellationTokenSource cts) { if (_subscriptions.TryGetValue(correlationId, out var prior) && !ReferenceEquals(prior, cts)) { prior.Cancel(); prior.Dispose(); } _subscriptions[correlationId] = cts; } /// /// Removes the subscription entry for a correlation ID only if the stored CTS is /// exactly the one supplied. A racing replacement stream may already own the slot, /// in which case this is a no-op. Internal for testability. /// internal void RemoveSubscription(string correlationId, CancellationTokenSource cts) { _subscriptions.TryRemove(new KeyValuePair(correlationId, cts)); } /// /// Opens a server-streaming subscription for a specific instance. /// This is a long-running async method; the caller launches it as a background task. /// The callback delivers domain events, and /// lets the caller handle reconnection. /// public virtual async Task SubscribeAsync( string correlationId, string instanceUniqueName, Action onEvent, Action onError, CancellationToken ct) { if (_client is null) throw new InvalidOperationException("Cannot subscribe on a test-only client."); var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); RegisterSubscription(correlationId, cts); var request = new InstanceStreamRequest { CorrelationId = correlationId, InstanceUniqueName = instanceUniqueName }; try { using var call = _client.SubscribeInstance(request, cancellationToken: cts.Token); await foreach (var evt in call.ResponseStream.ReadAllAsync(cts.Token)) { var domainEvent = ConvertToDomainEvent(evt); if (domainEvent != null) onEvent(domainEvent); } } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { // Normal cancellation — not an error } catch (Exception ex) { onError(ex); } finally { // Remove only our own entry -- a racing reconnect may already own the slot. RemoveSubscription(correlationId, cts); } } /// /// Cancels an active subscription by correlation ID. /// public virtual void Unsubscribe(string correlationId) { if (_subscriptions.TryRemove(correlationId, out var cts)) { cts.Cancel(); cts.Dispose(); } } /// /// Converts a proto SiteStreamEvent to the corresponding domain message. /// Internal for testability. /// internal static object? ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch { SiteStreamEvent.EventOneofCase.AttributeChanged => new AttributeValueChanged( evt.AttributeChanged.InstanceUniqueName, evt.AttributeChanged.AttributePath, evt.AttributeChanged.AttributeName, evt.AttributeChanged.Value, MapQuality(evt.AttributeChanged.Quality), evt.AttributeChanged.Timestamp.ToDateTimeOffset()), SiteStreamEvent.EventOneofCase.AlarmChanged => new AlarmStateChanged( evt.AlarmChanged.InstanceUniqueName, evt.AlarmChanged.AlarmName, MapAlarmState(evt.AlarmChanged.State), evt.AlarmChanged.Priority, evt.AlarmChanged.Timestamp.ToDateTimeOffset()) { Level = MapAlarmLevel(evt.AlarmChanged.Level), Message = evt.AlarmChanged.Message ?? string.Empty }, _ => null }; /// /// Maps proto Quality enum to domain string. Internal for testability. /// internal static string MapQuality(Quality quality) => quality switch { Quality.Good => "Good", Quality.Uncertain => "Uncertain", Quality.Bad => "Bad", _ => "Unknown" }; /// /// Maps proto AlarmStateEnum to domain AlarmState. Internal for testability. /// internal static AlarmState MapAlarmState(AlarmStateEnum state) => state switch { AlarmStateEnum.AlarmStateNormal => AlarmState.Normal, AlarmStateEnum.AlarmStateActive => AlarmState.Active, _ => AlarmState.Normal }; /// /// Maps proto AlarmLevelEnum to domain AlarmLevel. Internal for testability. /// internal static AlarmLevel MapAlarmLevel(AlarmLevelEnum level) => level switch { AlarmLevelEnum.AlarmLevelLow => AlarmLevel.Low, AlarmLevelEnum.AlarmLevelLowLow => AlarmLevel.LowLow, AlarmLevelEnum.AlarmLevelHigh => AlarmLevel.High, AlarmLevelEnum.AlarmLevelHighHigh => AlarmLevel.HighHigh, _ => AlarmLevel.None }; /// /// Releases all subscription CancellationTokenSources and the underlying /// gRPC channel. All teardown here is synchronous (CTS disposal and /// ), so a synchronous /// can release everything without sync-over-async blocking. /// private void ReleaseResources() { foreach (var cts in _subscriptions.Values) { cts.Cancel(); cts.Dispose(); } _subscriptions.Clear(); _channel?.Dispose(); } public virtual ValueTask DisposeAsync() { ReleaseResources(); return ValueTask.CompletedTask; } /// /// Synchronous disposal. All resources held by this client are released /// synchronously, so callers (e.g. ) /// need not block on the async disposal path. /// public virtual void Dispose() { ReleaseResources(); } }