feat: add SiteStreamGrpcClient and SiteStreamGrpcClientFactory
Per-site gRPC client for central-side streaming subscriptions to site servers. SiteStreamGrpcClient manages server-streaming calls with keepalive, converts proto events to domain types, and supports cancellation via Unsubscribe. SiteStreamGrpcClientFactory caches one client per site identifier. Includes InternalsVisibleTo for test access to conversion helpers and comprehensive unit tests for event mapping, quality/alarm-state conversion, unsubscribe behavior, and factory caching.
This commit is contained in:
178
src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs
Normal file
178
src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs
Normal file
@@ -0,0 +1,178 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Grpc.Core;
|
||||
using Grpc.Net.Client;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ScadaLink.Commons.Messages.Streaming;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
|
||||
namespace ScadaLink.Communication.Grpc;
|
||||
|
||||
/// <summary>
|
||||
/// Per-site gRPC client that manages streaming subscriptions to a site's
|
||||
/// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this
|
||||
/// to open server-streaming calls for individual instances.
|
||||
/// </summary>
|
||||
public class SiteStreamGrpcClient : IAsyncDisposable
|
||||
{
|
||||
private readonly GrpcChannel? _channel;
|
||||
private readonly SiteStreamService.SiteStreamServiceClient? _client;
|
||||
private readonly ILogger? _logger;
|
||||
private readonly ConcurrentDictionary<string, CancellationTokenSource> _subscriptions = new();
|
||||
|
||||
public SiteStreamGrpcClient(string endpoint, ILogger logger)
|
||||
{
|
||||
_channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
|
||||
{
|
||||
HttpHandler = new SocketsHttpHandler
|
||||
{
|
||||
KeepAlivePingDelay = TimeSpan.FromSeconds(15),
|
||||
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
|
||||
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
|
||||
}
|
||||
});
|
||||
_client = new SiteStreamService.SiteStreamServiceClient(_channel);
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Private constructor for unit testing without a real gRPC channel.
|
||||
/// </summary>
|
||||
private SiteStreamGrpcClient()
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a test-only instance that has no gRPC channel. Used to test
|
||||
/// Unsubscribe and Dispose behavior without needing a real endpoint.
|
||||
/// </summary>
|
||||
internal static SiteStreamGrpcClient CreateForTesting() => new();
|
||||
|
||||
/// <summary>
|
||||
/// Registers a CancellationTokenSource for a correlation ID. Test-only.
|
||||
/// </summary>
|
||||
internal void AddSubscriptionForTesting(string correlationId, CancellationTokenSource cts)
|
||||
{
|
||||
_subscriptions[correlationId] = cts;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Opens a server-streaming subscription for a specific instance.
|
||||
/// This is a long-running async method; the caller launches it as a background task.
|
||||
/// The <paramref name="onEvent"/> callback delivers domain events, and
|
||||
/// <paramref name="onError"/> lets the caller handle reconnection.
|
||||
/// </summary>
|
||||
public async Task SubscribeAsync(
|
||||
string correlationId,
|
||||
string instanceUniqueName,
|
||||
Action<object> onEvent,
|
||||
Action<Exception> onError,
|
||||
CancellationToken ct)
|
||||
{
|
||||
if (_client is null)
|
||||
throw new InvalidOperationException("Cannot subscribe on a test-only client.");
|
||||
|
||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
_subscriptions[correlationId] = cts;
|
||||
|
||||
var request = new InstanceStreamRequest
|
||||
{
|
||||
CorrelationId = correlationId,
|
||||
InstanceUniqueName = instanceUniqueName
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
using var call = _client.SubscribeInstance(request, cancellationToken: cts.Token);
|
||||
|
||||
await foreach (var evt in call.ResponseStream.ReadAllAsync(cts.Token))
|
||||
{
|
||||
var domainEvent = ConvertToDomainEvent(evt);
|
||||
if (domainEvent != null)
|
||||
onEvent(domainEvent);
|
||||
}
|
||||
}
|
||||
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
|
||||
{
|
||||
// Normal cancellation — not an error
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
onError(ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_subscriptions.TryRemove(correlationId, out _);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cancels an active subscription by correlation ID.
|
||||
/// </summary>
|
||||
public void Unsubscribe(string correlationId)
|
||||
{
|
||||
if (_subscriptions.TryRemove(correlationId, out var cts))
|
||||
{
|
||||
cts.Cancel();
|
||||
cts.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Converts a proto SiteStreamEvent to the corresponding domain message.
|
||||
/// Internal for testability.
|
||||
/// </summary>
|
||||
internal static object? ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch
|
||||
{
|
||||
SiteStreamEvent.EventOneofCase.AttributeChanged => new AttributeValueChanged(
|
||||
evt.AttributeChanged.InstanceUniqueName,
|
||||
evt.AttributeChanged.AttributePath,
|
||||
evt.AttributeChanged.AttributeName,
|
||||
evt.AttributeChanged.Value,
|
||||
MapQuality(evt.AttributeChanged.Quality),
|
||||
evt.AttributeChanged.Timestamp.ToDateTimeOffset()),
|
||||
SiteStreamEvent.EventOneofCase.AlarmChanged => new AlarmStateChanged(
|
||||
evt.AlarmChanged.InstanceUniqueName,
|
||||
evt.AlarmChanged.AlarmName,
|
||||
MapAlarmState(evt.AlarmChanged.State),
|
||||
evt.AlarmChanged.Priority,
|
||||
evt.AlarmChanged.Timestamp.ToDateTimeOffset()),
|
||||
_ => null
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Maps proto Quality enum to domain string. Internal for testability.
|
||||
/// </summary>
|
||||
internal static string MapQuality(Quality quality) => quality switch
|
||||
{
|
||||
Quality.Good => "Good",
|
||||
Quality.Uncertain => "Uncertain",
|
||||
Quality.Bad => "Bad",
|
||||
_ => "Unknown"
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Maps proto AlarmStateEnum to domain AlarmState. Internal for testability.
|
||||
/// </summary>
|
||||
internal static AlarmState MapAlarmState(AlarmStateEnum state) => state switch
|
||||
{
|
||||
AlarmStateEnum.AlarmStateNormal => AlarmState.Normal,
|
||||
AlarmStateEnum.AlarmStateActive => AlarmState.Active,
|
||||
_ => AlarmState.Normal
|
||||
};
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
foreach (var cts in _subscriptions.Values)
|
||||
{
|
||||
cts.Cancel();
|
||||
cts.Dispose();
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
|
||||
if (_channel is not null)
|
||||
_channel.Dispose();
|
||||
|
||||
await ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace ScadaLink.Communication.Grpc;
|
||||
|
||||
/// <summary>
|
||||
/// Caches one <see cref="SiteStreamGrpcClient"/> per site identifier.
|
||||
/// The DebugStreamBridgeActor uses this factory to obtain (or create) a
|
||||
/// gRPC client for a given site before opening a streaming subscription.
|
||||
/// </summary>
|
||||
public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, SiteStreamGrpcClient> _clients = new();
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
|
||||
public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory)
|
||||
{
|
||||
_loggerFactory = loggerFactory;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns an existing client for the site or creates a new one.
|
||||
/// </summary>
|
||||
public SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
||||
{
|
||||
return _clients.GetOrAdd(siteIdentifier, _ =>
|
||||
{
|
||||
var logger = _loggerFactory.CreateLogger<SiteStreamGrpcClient>();
|
||||
return new SiteStreamGrpcClient(grpcEndpoint, logger);
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes and disposes the client for the given site.
|
||||
/// </summary>
|
||||
public async Task RemoveSiteAsync(string siteIdentifier)
|
||||
{
|
||||
if (_clients.TryRemove(siteIdentifier, out var client))
|
||||
{
|
||||
await client.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
foreach (var client in _clients.Values)
|
||||
{
|
||||
await client.DisposeAsync();
|
||||
}
|
||||
_clients.Clear();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,10 @@
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ScadaLink.Communication.Tests" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
||||
</ItemGroup>
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
|
||||
namespace ScadaLink.Communication.Tests.Grpc;
|
||||
|
||||
public class SiteStreamGrpcClientFactoryTests
|
||||
{
|
||||
private readonly ILoggerFactory _loggerFactory = NullLoggerFactory.Instance;
|
||||
|
||||
[Fact]
|
||||
public void GetOrCreate_ReturnsSameClientForSameSite()
|
||||
{
|
||||
using var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||
|
||||
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
var client2 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
|
||||
Assert.Same(client1, client2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetOrCreate_ReturnsDifferentClientsForDifferentSites()
|
||||
{
|
||||
using var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||
|
||||
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
var client2 = factory.GetOrCreate("site-b", "http://localhost:5200");
|
||||
|
||||
Assert.NotSame(client1, client2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RemoveSite_DisposesClient()
|
||||
{
|
||||
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||
|
||||
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
await factory.RemoveSiteAsync("site-a");
|
||||
|
||||
// After removal, GetOrCreate should return a new instance
|
||||
var client2 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
Assert.NotSame(client1, client2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RemoveSite_NonExistent_DoesNotThrow()
|
||||
{
|
||||
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||
await factory.RemoveSiteAsync("does-not-exist"); // Should not throw
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeAsync_DisposesAllClients()
|
||||
{
|
||||
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||
|
||||
factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||
factory.GetOrCreate("site-b", "http://localhost:5200");
|
||||
|
||||
await factory.DisposeAsync();
|
||||
|
||||
// After dispose, creating new clients should work (new instances)
|
||||
// This tests that Dispose doesn't throw
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
using ScadaLink.Commons.Messages.Streaming;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
|
||||
namespace ScadaLink.Communication.Tests.Grpc;
|
||||
|
||||
public class SiteStreamGrpcClientTests
|
||||
{
|
||||
[Fact]
|
||||
public void ConvertToDomainEvent_AttributeChanged_MapsCorrectly()
|
||||
{
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
var evt = new SiteStreamEvent
|
||||
{
|
||||
CorrelationId = "corr-1",
|
||||
AttributeChanged = new AttributeValueUpdate
|
||||
{
|
||||
InstanceUniqueName = "Site1.Pump01",
|
||||
AttributePath = "Modules.IO",
|
||||
AttributeName = "Temperature",
|
||||
Value = "42.5",
|
||||
Quality = Quality.Good,
|
||||
Timestamp = Timestamp.FromDateTimeOffset(ts)
|
||||
}
|
||||
};
|
||||
|
||||
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||
|
||||
var attr = Assert.IsType<AttributeValueChanged>(result);
|
||||
Assert.Equal("Site1.Pump01", attr.InstanceUniqueName);
|
||||
Assert.Equal("Modules.IO", attr.AttributePath);
|
||||
Assert.Equal("Temperature", attr.AttributeName);
|
||||
Assert.Equal("42.5", attr.Value);
|
||||
Assert.Equal("Good", attr.Quality);
|
||||
Assert.Equal(ts, attr.Timestamp);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConvertToDomainEvent_AlarmChanged_MapsCorrectly()
|
||||
{
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
var evt = new SiteStreamEvent
|
||||
{
|
||||
CorrelationId = "corr-2",
|
||||
AlarmChanged = new AlarmStateUpdate
|
||||
{
|
||||
InstanceUniqueName = "Site1.Motor01",
|
||||
AlarmName = "OverTemp",
|
||||
State = AlarmStateEnum.AlarmStateActive,
|
||||
Priority = 3,
|
||||
Timestamp = Timestamp.FromDateTimeOffset(ts)
|
||||
}
|
||||
};
|
||||
|
||||
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||
|
||||
var alarm = Assert.IsType<AlarmStateChanged>(result);
|
||||
Assert.Equal("Site1.Motor01", alarm.InstanceUniqueName);
|
||||
Assert.Equal("OverTemp", alarm.AlarmName);
|
||||
Assert.Equal(AlarmState.Active, alarm.State);
|
||||
Assert.Equal(3, alarm.Priority);
|
||||
Assert.Equal(ts, alarm.Timestamp);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConvertToDomainEvent_UnknownEvent_ReturnsNull()
|
||||
{
|
||||
var evt = new SiteStreamEvent
|
||||
{
|
||||
CorrelationId = "corr-3"
|
||||
// No oneof case set
|
||||
};
|
||||
|
||||
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||
|
||||
Assert.Null(result);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(Quality.Good, "Good")]
|
||||
[InlineData(Quality.Uncertain, "Uncertain")]
|
||||
[InlineData(Quality.Bad, "Bad")]
|
||||
[InlineData(Quality.Unspecified, "Unknown")]
|
||||
public void MapQuality_AllValues(Quality input, string expected)
|
||||
{
|
||||
var result = SiteStreamGrpcClient.MapQuality(input);
|
||||
Assert.Equal(expected, result);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(AlarmStateEnum.AlarmStateNormal, AlarmState.Normal)]
|
||||
[InlineData(AlarmStateEnum.AlarmStateActive, AlarmState.Active)]
|
||||
[InlineData(AlarmStateEnum.AlarmStateUnspecified, AlarmState.Normal)]
|
||||
public void MapAlarmState_AllValues(AlarmStateEnum input, AlarmState expected)
|
||||
{
|
||||
var result = SiteStreamGrpcClient.MapAlarmState(input);
|
||||
Assert.Equal(expected, result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Unsubscribe_CancelsSubscription()
|
||||
{
|
||||
// We can't easily test the full Subscribe flow without a real gRPC server,
|
||||
// but we can test the Unsubscribe path by registering a CTS directly.
|
||||
// Use the internal AddSubscription helper for testability.
|
||||
var client = SiteStreamGrpcClient.CreateForTesting();
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
client.AddSubscriptionForTesting("corr-test", cts);
|
||||
|
||||
Assert.False(cts.IsCancellationRequested);
|
||||
|
||||
client.Unsubscribe("corr-test");
|
||||
|
||||
Assert.True(cts.IsCancellationRequested);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Unsubscribe_NonExistent_DoesNotThrow()
|
||||
{
|
||||
var client = SiteStreamGrpcClient.CreateForTesting();
|
||||
client.Unsubscribe("does-not-exist"); // Should not throw
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeAsync_CancelsAllSubscriptions()
|
||||
{
|
||||
var client = SiteStreamGrpcClient.CreateForTesting();
|
||||
|
||||
var cts1 = new CancellationTokenSource();
|
||||
var cts2 = new CancellationTokenSource();
|
||||
client.AddSubscriptionForTesting("corr-1", cts1);
|
||||
client.AddSubscriptionForTesting("corr-2", cts2);
|
||||
|
||||
await client.DisposeAsync();
|
||||
|
||||
Assert.True(cts1.IsCancellationRequested);
|
||||
Assert.True(cts2.IsCancellationRequested);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user