diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs new file mode 100644 index 0000000..bfc62b6 --- /dev/null +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs @@ -0,0 +1,178 @@ +using System.Collections.Concurrent; +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Messages.Streaming; +using ScadaLink.Commons.Types.Enums; +using Google.Protobuf.WellKnownTypes; + +namespace ScadaLink.Communication.Grpc; + +/// +/// Per-site gRPC client that manages streaming subscriptions to a site's +/// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this +/// to open server-streaming calls for individual instances. +/// +public class SiteStreamGrpcClient : IAsyncDisposable +{ + private readonly GrpcChannel? _channel; + private readonly SiteStreamService.SiteStreamServiceClient? _client; + private readonly ILogger? _logger; + private readonly ConcurrentDictionary _subscriptions = new(); + + public SiteStreamGrpcClient(string endpoint, ILogger logger) + { + _channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions + { + HttpHandler = new SocketsHttpHandler + { + KeepAlivePingDelay = TimeSpan.FromSeconds(15), + KeepAlivePingTimeout = TimeSpan.FromSeconds(10), + KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always + } + }); + _client = new SiteStreamService.SiteStreamServiceClient(_channel); + _logger = logger; + } + + /// + /// Private constructor for unit testing without a real gRPC channel. + /// + private SiteStreamGrpcClient() + { + } + + /// + /// Creates a test-only instance that has no gRPC channel. Used to test + /// Unsubscribe and Dispose behavior without needing a real endpoint. + /// + internal static SiteStreamGrpcClient CreateForTesting() => new(); + + /// + /// Registers a CancellationTokenSource for a correlation ID. Test-only. + /// + internal void AddSubscriptionForTesting(string correlationId, CancellationTokenSource cts) + { + _subscriptions[correlationId] = cts; + } + + /// + /// Opens a server-streaming subscription for a specific instance. + /// This is a long-running async method; the caller launches it as a background task. + /// The callback delivers domain events, and + /// lets the caller handle reconnection. + /// + public async Task SubscribeAsync( + string correlationId, + string instanceUniqueName, + Action onEvent, + Action onError, + CancellationToken ct) + { + if (_client is null) + throw new InvalidOperationException("Cannot subscribe on a test-only client."); + + var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + _subscriptions[correlationId] = cts; + + var request = new InstanceStreamRequest + { + CorrelationId = correlationId, + InstanceUniqueName = instanceUniqueName + }; + + try + { + using var call = _client.SubscribeInstance(request, cancellationToken: cts.Token); + + await foreach (var evt in call.ResponseStream.ReadAllAsync(cts.Token)) + { + var domainEvent = ConvertToDomainEvent(evt); + if (domainEvent != null) + onEvent(domainEvent); + } + } + catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) + { + // Normal cancellation — not an error + } + catch (Exception ex) + { + onError(ex); + } + finally + { + _subscriptions.TryRemove(correlationId, out _); + } + } + + /// + /// Cancels an active subscription by correlation ID. + /// + public void Unsubscribe(string correlationId) + { + if (_subscriptions.TryRemove(correlationId, out var cts)) + { + cts.Cancel(); + cts.Dispose(); + } + } + + /// + /// Converts a proto SiteStreamEvent to the corresponding domain message. + /// Internal for testability. + /// + internal static object? ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch + { + SiteStreamEvent.EventOneofCase.AttributeChanged => new AttributeValueChanged( + evt.AttributeChanged.InstanceUniqueName, + evt.AttributeChanged.AttributePath, + evt.AttributeChanged.AttributeName, + evt.AttributeChanged.Value, + MapQuality(evt.AttributeChanged.Quality), + evt.AttributeChanged.Timestamp.ToDateTimeOffset()), + SiteStreamEvent.EventOneofCase.AlarmChanged => new AlarmStateChanged( + evt.AlarmChanged.InstanceUniqueName, + evt.AlarmChanged.AlarmName, + MapAlarmState(evt.AlarmChanged.State), + evt.AlarmChanged.Priority, + evt.AlarmChanged.Timestamp.ToDateTimeOffset()), + _ => null + }; + + /// + /// Maps proto Quality enum to domain string. Internal for testability. + /// + internal static string MapQuality(Quality quality) => quality switch + { + Quality.Good => "Good", + Quality.Uncertain => "Uncertain", + Quality.Bad => "Bad", + _ => "Unknown" + }; + + /// + /// Maps proto AlarmStateEnum to domain AlarmState. Internal for testability. + /// + internal static AlarmState MapAlarmState(AlarmStateEnum state) => state switch + { + AlarmStateEnum.AlarmStateNormal => AlarmState.Normal, + AlarmStateEnum.AlarmStateActive => AlarmState.Active, + _ => AlarmState.Normal + }; + + public async ValueTask DisposeAsync() + { + foreach (var cts in _subscriptions.Values) + { + cts.Cancel(); + cts.Dispose(); + } + _subscriptions.Clear(); + + if (_channel is not null) + _channel.Dispose(); + + await ValueTask.CompletedTask; + } +} diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs new file mode 100644 index 0000000..406e8c2 --- /dev/null +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs @@ -0,0 +1,57 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; + +namespace ScadaLink.Communication.Grpc; + +/// +/// Caches one per site identifier. +/// The DebugStreamBridgeActor uses this factory to obtain (or create) a +/// gRPC client for a given site before opening a streaming subscription. +/// +public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable +{ + private readonly ConcurrentDictionary _clients = new(); + private readonly ILoggerFactory _loggerFactory; + + public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory) + { + _loggerFactory = loggerFactory; + } + + /// + /// Returns an existing client for the site or creates a new one. + /// + public SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) + { + return _clients.GetOrAdd(siteIdentifier, _ => + { + var logger = _loggerFactory.CreateLogger(); + return new SiteStreamGrpcClient(grpcEndpoint, logger); + }); + } + + /// + /// Removes and disposes the client for the given site. + /// + public async Task RemoveSiteAsync(string siteIdentifier) + { + if (_clients.TryRemove(siteIdentifier, out var client)) + { + await client.DisposeAsync(); + } + } + + public async ValueTask DisposeAsync() + { + foreach (var client in _clients.Values) + { + await client.DisposeAsync(); + } + _clients.Clear(); + } + + public void Dispose() + { + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } +} diff --git a/src/ScadaLink.Communication/ScadaLink.Communication.csproj b/src/ScadaLink.Communication/ScadaLink.Communication.csproj index 0d77342..3caeb10 100644 --- a/src/ScadaLink.Communication/ScadaLink.Communication.csproj +++ b/src/ScadaLink.Communication/ScadaLink.Communication.csproj @@ -7,6 +7,10 @@ true + + + + diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs new file mode 100644 index 0000000..fe5bb54 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientFactoryTests.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +public class SiteStreamGrpcClientFactoryTests +{ + private readonly ILoggerFactory _loggerFactory = NullLoggerFactory.Instance; + + [Fact] + public void GetOrCreate_ReturnsSameClientForSameSite() + { + using var factory = new SiteStreamGrpcClientFactory(_loggerFactory); + + var client1 = factory.GetOrCreate("site-a", "http://localhost:5100"); + var client2 = factory.GetOrCreate("site-a", "http://localhost:5100"); + + Assert.Same(client1, client2); + } + + [Fact] + public void GetOrCreate_ReturnsDifferentClientsForDifferentSites() + { + using var factory = new SiteStreamGrpcClientFactory(_loggerFactory); + + var client1 = factory.GetOrCreate("site-a", "http://localhost:5100"); + var client2 = factory.GetOrCreate("site-b", "http://localhost:5200"); + + Assert.NotSame(client1, client2); + } + + [Fact] + public async Task RemoveSite_DisposesClient() + { + var factory = new SiteStreamGrpcClientFactory(_loggerFactory); + + var client1 = factory.GetOrCreate("site-a", "http://localhost:5100"); + await factory.RemoveSiteAsync("site-a"); + + // After removal, GetOrCreate should return a new instance + var client2 = factory.GetOrCreate("site-a", "http://localhost:5100"); + Assert.NotSame(client1, client2); + } + + [Fact] + public async Task RemoveSite_NonExistent_DoesNotThrow() + { + var factory = new SiteStreamGrpcClientFactory(_loggerFactory); + await factory.RemoveSiteAsync("does-not-exist"); // Should not throw + } + + [Fact] + public async Task DisposeAsync_DisposesAllClients() + { + var factory = new SiteStreamGrpcClientFactory(_loggerFactory); + + factory.GetOrCreate("site-a", "http://localhost:5100"); + factory.GetOrCreate("site-b", "http://localhost:5200"); + + await factory.DisposeAsync(); + + // After dispose, creating new clients should work (new instances) + // This tests that Dispose doesn't throw + } +} diff --git a/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs new file mode 100644 index 0000000..698ef2f --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/SiteStreamGrpcClientTests.cs @@ -0,0 +1,141 @@ +using Google.Protobuf.WellKnownTypes; +using ScadaLink.Communication.Grpc; +using ScadaLink.Commons.Messages.Streaming; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.Communication.Tests.Grpc; + +public class SiteStreamGrpcClientTests +{ + [Fact] + public void ConvertToDomainEvent_AttributeChanged_MapsCorrectly() + { + var ts = DateTimeOffset.UtcNow; + var evt = new SiteStreamEvent + { + CorrelationId = "corr-1", + AttributeChanged = new AttributeValueUpdate + { + InstanceUniqueName = "Site1.Pump01", + AttributePath = "Modules.IO", + AttributeName = "Temperature", + Value = "42.5", + Quality = Quality.Good, + Timestamp = Timestamp.FromDateTimeOffset(ts) + } + }; + + var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt); + + var attr = Assert.IsType(result); + Assert.Equal("Site1.Pump01", attr.InstanceUniqueName); + Assert.Equal("Modules.IO", attr.AttributePath); + Assert.Equal("Temperature", attr.AttributeName); + Assert.Equal("42.5", attr.Value); + Assert.Equal("Good", attr.Quality); + Assert.Equal(ts, attr.Timestamp); + } + + [Fact] + public void ConvertToDomainEvent_AlarmChanged_MapsCorrectly() + { + var ts = DateTimeOffset.UtcNow; + var evt = new SiteStreamEvent + { + CorrelationId = "corr-2", + AlarmChanged = new AlarmStateUpdate + { + InstanceUniqueName = "Site1.Motor01", + AlarmName = "OverTemp", + State = AlarmStateEnum.AlarmStateActive, + Priority = 3, + Timestamp = Timestamp.FromDateTimeOffset(ts) + } + }; + + var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt); + + var alarm = Assert.IsType(result); + Assert.Equal("Site1.Motor01", alarm.InstanceUniqueName); + Assert.Equal("OverTemp", alarm.AlarmName); + Assert.Equal(AlarmState.Active, alarm.State); + Assert.Equal(3, alarm.Priority); + Assert.Equal(ts, alarm.Timestamp); + } + + [Fact] + public void ConvertToDomainEvent_UnknownEvent_ReturnsNull() + { + var evt = new SiteStreamEvent + { + CorrelationId = "corr-3" + // No oneof case set + }; + + var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt); + + Assert.Null(result); + } + + [Theory] + [InlineData(Quality.Good, "Good")] + [InlineData(Quality.Uncertain, "Uncertain")] + [InlineData(Quality.Bad, "Bad")] + [InlineData(Quality.Unspecified, "Unknown")] + public void MapQuality_AllValues(Quality input, string expected) + { + var result = SiteStreamGrpcClient.MapQuality(input); + Assert.Equal(expected, result); + } + + [Theory] + [InlineData(AlarmStateEnum.AlarmStateNormal, AlarmState.Normal)] + [InlineData(AlarmStateEnum.AlarmStateActive, AlarmState.Active)] + [InlineData(AlarmStateEnum.AlarmStateUnspecified, AlarmState.Normal)] + public void MapAlarmState_AllValues(AlarmStateEnum input, AlarmState expected) + { + var result = SiteStreamGrpcClient.MapAlarmState(input); + Assert.Equal(expected, result); + } + + [Fact] + public void Unsubscribe_CancelsSubscription() + { + // We can't easily test the full Subscribe flow without a real gRPC server, + // but we can test the Unsubscribe path by registering a CTS directly. + // Use the internal AddSubscription helper for testability. + var client = SiteStreamGrpcClient.CreateForTesting(); + + var cts = new CancellationTokenSource(); + client.AddSubscriptionForTesting("corr-test", cts); + + Assert.False(cts.IsCancellationRequested); + + client.Unsubscribe("corr-test"); + + Assert.True(cts.IsCancellationRequested); + } + + [Fact] + public void Unsubscribe_NonExistent_DoesNotThrow() + { + var client = SiteStreamGrpcClient.CreateForTesting(); + client.Unsubscribe("does-not-exist"); // Should not throw + } + + [Fact] + public async Task DisposeAsync_CancelsAllSubscriptions() + { + var client = SiteStreamGrpcClient.CreateForTesting(); + + var cts1 = new CancellationTokenSource(); + var cts2 = new CancellationTokenSource(); + client.AddSubscriptionForTesting("corr-1", cts1); + client.AddSubscriptionForTesting("corr-2", cts2); + + await client.DisposeAsync(); + + Assert.True(cts1.IsCancellationRequested); + Assert.True(cts2.IsCancellationRequested); + } +}