Compare commits
2 Commits
55a05914d0
...
2cd43b6992
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2cd43b6992 | ||
|
|
25a6022f7b |
@@ -2,16 +2,18 @@ using Akka.Actor;
|
|||||||
using Akka.Event;
|
using Akka.Event;
|
||||||
using ScadaLink.Commons.Messages.DebugView;
|
using ScadaLink.Commons.Messages.DebugView;
|
||||||
using ScadaLink.Commons.Messages.Streaming;
|
using ScadaLink.Commons.Messages.Streaming;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
namespace ScadaLink.Communication.Actors;
|
namespace ScadaLink.Communication.Actors;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Persistent actor (one per active debug session) on the central side.
|
/// Persistent actor (one per active debug session) on the central side.
|
||||||
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
|
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
|
||||||
/// as the Sender), so the site's InstanceActor registers this actor as the debug subscriber.
|
/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
|
||||||
/// Stream events flow back via Akka remoting and are forwarded to the consumer via callbacks.
|
/// server-streaming subscription via SiteStreamGrpcClient for ongoing events.
|
||||||
|
/// Stream events are marshalled back to the actor via Self.Tell for thread safety.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class DebugStreamBridgeActor : ReceiveActor
|
public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
|
||||||
{
|
{
|
||||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
private readonly string _siteIdentifier;
|
private readonly string _siteIdentifier;
|
||||||
@@ -20,6 +22,19 @@ public class DebugStreamBridgeActor : ReceiveActor
|
|||||||
private readonly IActorRef _centralCommunicationActor;
|
private readonly IActorRef _centralCommunicationActor;
|
||||||
private readonly Action<object> _onEvent;
|
private readonly Action<object> _onEvent;
|
||||||
private readonly Action _onTerminated;
|
private readonly Action _onTerminated;
|
||||||
|
private readonly SiteStreamGrpcClientFactory _grpcFactory;
|
||||||
|
private readonly string _grpcNodeAAddress;
|
||||||
|
private readonly string _grpcNodeBAddress;
|
||||||
|
|
||||||
|
private const int MaxRetries = 3;
|
||||||
|
private const string ReconnectTimerKey = "grpc-reconnect";
|
||||||
|
internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);
|
||||||
|
private int _retryCount;
|
||||||
|
private bool _useNodeA = true;
|
||||||
|
private bool _stopped;
|
||||||
|
private CancellationTokenSource? _grpcCts;
|
||||||
|
|
||||||
|
public ITimerScheduler Timers { get; set; } = null!;
|
||||||
|
|
||||||
public DebugStreamBridgeActor(
|
public DebugStreamBridgeActor(
|
||||||
string siteIdentifier,
|
string siteIdentifier,
|
||||||
@@ -27,7 +42,10 @@ public class DebugStreamBridgeActor : ReceiveActor
|
|||||||
string correlationId,
|
string correlationId,
|
||||||
IActorRef centralCommunicationActor,
|
IActorRef centralCommunicationActor,
|
||||||
Action<object> onEvent,
|
Action<object> onEvent,
|
||||||
Action onTerminated)
|
Action onTerminated,
|
||||||
|
SiteStreamGrpcClientFactory grpcFactory,
|
||||||
|
string grpcNodeAAddress,
|
||||||
|
string grpcNodeBAddress)
|
||||||
{
|
{
|
||||||
_siteIdentifier = siteIdentifier;
|
_siteIdentifier = siteIdentifier;
|
||||||
_instanceUniqueName = instanceUniqueName;
|
_instanceUniqueName = instanceUniqueName;
|
||||||
@@ -35,31 +53,58 @@ public class DebugStreamBridgeActor : ReceiveActor
|
|||||||
_centralCommunicationActor = centralCommunicationActor;
|
_centralCommunicationActor = centralCommunicationActor;
|
||||||
_onEvent = onEvent;
|
_onEvent = onEvent;
|
||||||
_onTerminated = onTerminated;
|
_onTerminated = onTerminated;
|
||||||
|
_grpcFactory = grpcFactory;
|
||||||
|
_grpcNodeAAddress = grpcNodeAAddress;
|
||||||
|
_grpcNodeBAddress = grpcNodeBAddress;
|
||||||
|
|
||||||
// Initial snapshot response from the site
|
// Initial snapshot response from the site (via ClusterClient)
|
||||||
Receive<DebugViewSnapshot>(snapshot =>
|
Receive<DebugViewSnapshot>(snapshot =>
|
||||||
{
|
{
|
||||||
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
|
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
|
||||||
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
|
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
|
||||||
_onEvent(snapshot);
|
_onEvent(snapshot);
|
||||||
|
OpenGrpcStream();
|
||||||
});
|
});
|
||||||
|
|
||||||
// Ongoing stream events from the site's InstanceActor
|
// Domain events arriving via Self.Tell from gRPC callback
|
||||||
Receive<AttributeValueChanged>(changed => _onEvent(changed));
|
Receive<AttributeValueChanged>(changed =>
|
||||||
Receive<AlarmStateChanged>(changed => _onEvent(changed));
|
{
|
||||||
|
_retryCount = 0; // Successful event resets retry count
|
||||||
|
_onEvent(changed);
|
||||||
|
});
|
||||||
|
Receive<AlarmStateChanged>(changed =>
|
||||||
|
{
|
||||||
|
_retryCount = 0;
|
||||||
|
_onEvent(changed);
|
||||||
|
});
|
||||||
|
|
||||||
|
// gRPC stream error — attempt reconnection
|
||||||
|
Receive<GrpcStreamError>(msg =>
|
||||||
|
{
|
||||||
|
_log.Warning("gRPC stream error for {0}: {1}", _instanceUniqueName, msg.Exception.Message);
|
||||||
|
HandleGrpcError();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Scheduled reconnection
|
||||||
|
Receive<ReconnectGrpcStream>(_ => OpenGrpcStream());
|
||||||
|
|
||||||
// Consumer requests stop
|
// Consumer requests stop
|
||||||
Receive<StopDebugStream>(_ =>
|
Receive<StopDebugStream>(_ =>
|
||||||
{
|
{
|
||||||
_log.Info("Stopping debug stream for {0}", _instanceUniqueName);
|
_log.Info("Stopping debug stream for {0}", _instanceUniqueName);
|
||||||
|
CleanupGrpc();
|
||||||
SendUnsubscribe();
|
SendUnsubscribe();
|
||||||
|
_stopped = true;
|
||||||
Context.Stop(Self);
|
Context.Stop(Self);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Site disconnected — CentralCommunicationActor notifies us
|
// Site disconnected — CentralCommunicationActor notifies us
|
||||||
Receive<DebugStreamTerminated>(msg =>
|
Receive<DebugStreamTerminated>(msg =>
|
||||||
{
|
{
|
||||||
|
if (_stopped) return; // Idempotent — gRPC error may arrive simultaneously
|
||||||
_log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId);
|
_log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId);
|
||||||
|
CleanupGrpc();
|
||||||
|
_stopped = true;
|
||||||
_onTerminated();
|
_onTerminated();
|
||||||
Context.Stop(Self);
|
Context.Stop(Self);
|
||||||
});
|
});
|
||||||
@@ -69,7 +114,9 @@ public class DebugStreamBridgeActor : ReceiveActor
|
|||||||
Receive<ReceiveTimeout>(_ =>
|
Receive<ReceiveTimeout>(_ =>
|
||||||
{
|
{
|
||||||
_log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName);
|
_log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName);
|
||||||
|
CleanupGrpc();
|
||||||
SendUnsubscribe();
|
SendUnsubscribe();
|
||||||
|
_stopped = true;
|
||||||
_onTerminated();
|
_onTerminated();
|
||||||
Context.Stop(Self);
|
Context.Stop(Self);
|
||||||
});
|
});
|
||||||
@@ -79,13 +126,88 @@ public class DebugStreamBridgeActor : ReceiveActor
|
|||||||
{
|
{
|
||||||
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
|
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
|
||||||
|
|
||||||
// Send subscribe request via CentralCommunicationActor.
|
// Send subscribe request via CentralCommunicationActor for the initial snapshot.
|
||||||
// THIS actor is the Sender, so the site's InstanceActor registers us as the subscriber.
|
|
||||||
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
|
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
|
||||||
var envelope = new SiteEnvelope(_siteIdentifier, request);
|
var envelope = new SiteEnvelope(_siteIdentifier, request);
|
||||||
_centralCommunicationActor.Tell(envelope, Self);
|
_centralCommunicationActor.Tell(envelope, Self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected override void PostStop()
|
||||||
|
{
|
||||||
|
_grpcCts?.Cancel();
|
||||||
|
_grpcCts?.Dispose();
|
||||||
|
_grpcCts = null;
|
||||||
|
base.PostStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void OpenGrpcStream()
|
||||||
|
{
|
||||||
|
if (_stopped) return;
|
||||||
|
|
||||||
|
var endpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress;
|
||||||
|
_log.Info("Opening gRPC stream for {0} to {1}", _instanceUniqueName, endpoint);
|
||||||
|
|
||||||
|
_grpcCts?.Cancel();
|
||||||
|
_grpcCts?.Dispose();
|
||||||
|
_grpcCts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint);
|
||||||
|
var self = Self;
|
||||||
|
var ct = _grpcCts.Token;
|
||||||
|
|
||||||
|
// Launch as background task — onEvent and onError marshal back to actor via Tell
|
||||||
|
Task.Run(async () =>
|
||||||
|
{
|
||||||
|
await client.SubscribeAsync(
|
||||||
|
_correlationId,
|
||||||
|
_instanceUniqueName,
|
||||||
|
evt => self.Tell(evt),
|
||||||
|
ex => self.Tell(new GrpcStreamError(ex)),
|
||||||
|
ct);
|
||||||
|
}, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleGrpcError()
|
||||||
|
{
|
||||||
|
if (_stopped) return;
|
||||||
|
|
||||||
|
_retryCount++;
|
||||||
|
|
||||||
|
if (_retryCount > MaxRetries)
|
||||||
|
{
|
||||||
|
_log.Error("gRPC stream for {0} exceeded max retries ({1}), terminating", _instanceUniqueName, MaxRetries);
|
||||||
|
CleanupGrpc();
|
||||||
|
_stopped = true;
|
||||||
|
_onTerminated();
|
||||||
|
Context.Stop(Self);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flip to the other node
|
||||||
|
_useNodeA = !_useNodeA;
|
||||||
|
|
||||||
|
// First retry is immediate, subsequent retries use a short backoff
|
||||||
|
if (_retryCount == 1)
|
||||||
|
{
|
||||||
|
Self.Tell(new ReconnectGrpcStream());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Timers.StartSingleTimer(ReconnectTimerKey, new ReconnectGrpcStream(), ReconnectDelay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void CleanupGrpc()
|
||||||
|
{
|
||||||
|
_grpcCts?.Cancel();
|
||||||
|
_grpcCts?.Dispose();
|
||||||
|
_grpcCts = null;
|
||||||
|
|
||||||
|
var client = _grpcFactory.GetOrCreate(_siteIdentifier,
|
||||||
|
_useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress);
|
||||||
|
client.Unsubscribe(_correlationId);
|
||||||
|
}
|
||||||
|
|
||||||
private void SendUnsubscribe()
|
private void SendUnsubscribe()
|
||||||
{
|
{
|
||||||
var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
|
var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
|
||||||
@@ -98,3 +220,13 @@ public class DebugStreamBridgeActor : ReceiveActor
|
|||||||
/// Message sent to a DebugStreamBridgeActor to stop the debug stream session.
|
/// Message sent to a DebugStreamBridgeActor to stop the debug stream session.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public record StopDebugStream;
|
public record StopDebugStream;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Internal message indicating a gRPC stream error occurred.
|
||||||
|
/// </summary>
|
||||||
|
internal record GrpcStreamError(Exception Exception);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Internal message to trigger gRPC stream reconnection.
|
||||||
|
/// </summary>
|
||||||
|
internal record ReconnectGrpcStream;
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
|
|||||||
using ScadaLink.Commons.Interfaces.Repositories;
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
using ScadaLink.Commons.Messages.DebugView;
|
using ScadaLink.Commons.Messages.DebugView;
|
||||||
using ScadaLink.Communication.Actors;
|
using ScadaLink.Communication.Actors;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
namespace ScadaLink.Communication;
|
namespace ScadaLink.Communication;
|
||||||
|
|
||||||
@@ -17,6 +18,7 @@ public class DebugStreamService
|
|||||||
{
|
{
|
||||||
private readonly CommunicationService _communicationService;
|
private readonly CommunicationService _communicationService;
|
||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
private readonly SiteStreamGrpcClientFactory _grpcClientFactory;
|
||||||
private readonly ILogger<DebugStreamService> _logger;
|
private readonly ILogger<DebugStreamService> _logger;
|
||||||
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
|
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
|
||||||
private ActorSystem? _actorSystem;
|
private ActorSystem? _actorSystem;
|
||||||
@@ -24,10 +26,12 @@ public class DebugStreamService
|
|||||||
public DebugStreamService(
|
public DebugStreamService(
|
||||||
CommunicationService communicationService,
|
CommunicationService communicationService,
|
||||||
IServiceProvider serviceProvider,
|
IServiceProvider serviceProvider,
|
||||||
|
SiteStreamGrpcClientFactory grpcClientFactory,
|
||||||
ILogger<DebugStreamService> logger)
|
ILogger<DebugStreamService> logger)
|
||||||
{
|
{
|
||||||
_communicationService = communicationService;
|
_communicationService = communicationService;
|
||||||
_serviceProvider = serviceProvider;
|
_serviceProvider = serviceProvider;
|
||||||
|
_grpcClientFactory = grpcClientFactory;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,6 +60,8 @@ public class DebugStreamService
|
|||||||
// Resolve instance → unique name + site
|
// Resolve instance → unique name + site
|
||||||
string instanceUniqueName;
|
string instanceUniqueName;
|
||||||
string siteIdentifier;
|
string siteIdentifier;
|
||||||
|
string grpcNodeAAddress;
|
||||||
|
string grpcNodeBAddress;
|
||||||
|
|
||||||
using (var scope = _serviceProvider.CreateScope())
|
using (var scope = _serviceProvider.CreateScope())
|
||||||
{
|
{
|
||||||
@@ -69,6 +75,10 @@ public class DebugStreamService
|
|||||||
|
|
||||||
instanceUniqueName = instance.UniqueName;
|
instanceUniqueName = instance.UniqueName;
|
||||||
siteIdentifier = site.SiteIdentifier;
|
siteIdentifier = site.SiteIdentifier;
|
||||||
|
grpcNodeAAddress = site.GrpcNodeAAddress
|
||||||
|
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured.");
|
||||||
|
grpcNodeBAddress = site.GrpcNodeBAddress
|
||||||
|
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
var sessionId = Guid.NewGuid().ToString("N");
|
var sessionId = Guid.NewGuid().ToString("N");
|
||||||
@@ -104,7 +114,10 @@ public class DebugStreamService
|
|||||||
sessionId,
|
sessionId,
|
||||||
commActor,
|
commActor,
|
||||||
onEventWrapper,
|
onEventWrapper,
|
||||||
onTerminatedWrapper);
|
onTerminatedWrapper,
|
||||||
|
_grpcClientFactory,
|
||||||
|
grpcNodeAAddress,
|
||||||
|
grpcNodeBAddress);
|
||||||
|
|
||||||
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");
|
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");
|
||||||
|
|
||||||
|
|||||||
179
src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs
Normal file
179
src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Grpc.Core;
|
||||||
|
using Grpc.Net.Client;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ScadaLink.Commons.Messages.Streaming;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Per-site gRPC client that manages streaming subscriptions to a site's
|
||||||
|
/// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this
|
||||||
|
/// to open server-streaming calls for individual instances.
|
||||||
|
/// </summary>
|
||||||
|
public class SiteStreamGrpcClient : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly GrpcChannel? _channel;
|
||||||
|
private readonly SiteStreamService.SiteStreamServiceClient? _client;
|
||||||
|
private readonly ILogger? _logger;
|
||||||
|
private readonly ConcurrentDictionary<string, CancellationTokenSource> _subscriptions = new();
|
||||||
|
|
||||||
|
public SiteStreamGrpcClient(string endpoint, ILogger logger)
|
||||||
|
{
|
||||||
|
_channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
|
||||||
|
{
|
||||||
|
HttpHandler = new SocketsHttpHandler
|
||||||
|
{
|
||||||
|
KeepAlivePingDelay = TimeSpan.FromSeconds(15),
|
||||||
|
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
|
||||||
|
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
|
||||||
|
}
|
||||||
|
});
|
||||||
|
_client = new SiteStreamService.SiteStreamServiceClient(_channel);
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Protected constructor for unit testing without a real gRPC channel.
|
||||||
|
/// Allows subclassing for mock implementations.
|
||||||
|
/// </summary>
|
||||||
|
protected SiteStreamGrpcClient()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a test-only instance that has no gRPC channel. Used to test
|
||||||
|
/// Unsubscribe and Dispose behavior without needing a real endpoint.
|
||||||
|
/// </summary>
|
||||||
|
internal static SiteStreamGrpcClient CreateForTesting() => new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Registers a CancellationTokenSource for a correlation ID. Test-only.
|
||||||
|
/// </summary>
|
||||||
|
internal void AddSubscriptionForTesting(string correlationId, CancellationTokenSource cts)
|
||||||
|
{
|
||||||
|
_subscriptions[correlationId] = cts;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Opens a server-streaming subscription for a specific instance.
|
||||||
|
/// This is a long-running async method; the caller launches it as a background task.
|
||||||
|
/// The <paramref name="onEvent"/> callback delivers domain events, and
|
||||||
|
/// <paramref name="onError"/> lets the caller handle reconnection.
|
||||||
|
/// </summary>
|
||||||
|
public virtual async Task SubscribeAsync(
|
||||||
|
string correlationId,
|
||||||
|
string instanceUniqueName,
|
||||||
|
Action<object> onEvent,
|
||||||
|
Action<Exception> onError,
|
||||||
|
CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_client is null)
|
||||||
|
throw new InvalidOperationException("Cannot subscribe on a test-only client.");
|
||||||
|
|
||||||
|
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||||
|
_subscriptions[correlationId] = cts;
|
||||||
|
|
||||||
|
var request = new InstanceStreamRequest
|
||||||
|
{
|
||||||
|
CorrelationId = correlationId,
|
||||||
|
InstanceUniqueName = instanceUniqueName
|
||||||
|
};
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var call = _client.SubscribeInstance(request, cancellationToken: cts.Token);
|
||||||
|
|
||||||
|
await foreach (var evt in call.ResponseStream.ReadAllAsync(cts.Token))
|
||||||
|
{
|
||||||
|
var domainEvent = ConvertToDomainEvent(evt);
|
||||||
|
if (domainEvent != null)
|
||||||
|
onEvent(domainEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
|
||||||
|
{
|
||||||
|
// Normal cancellation — not an error
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
onError(ex);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_subscriptions.TryRemove(correlationId, out _);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Cancels an active subscription by correlation ID.
|
||||||
|
/// </summary>
|
||||||
|
public virtual void Unsubscribe(string correlationId)
|
||||||
|
{
|
||||||
|
if (_subscriptions.TryRemove(correlationId, out var cts))
|
||||||
|
{
|
||||||
|
cts.Cancel();
|
||||||
|
cts.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Converts a proto SiteStreamEvent to the corresponding domain message.
|
||||||
|
/// Internal for testability.
|
||||||
|
/// </summary>
|
||||||
|
internal static object? ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch
|
||||||
|
{
|
||||||
|
SiteStreamEvent.EventOneofCase.AttributeChanged => new AttributeValueChanged(
|
||||||
|
evt.AttributeChanged.InstanceUniqueName,
|
||||||
|
evt.AttributeChanged.AttributePath,
|
||||||
|
evt.AttributeChanged.AttributeName,
|
||||||
|
evt.AttributeChanged.Value,
|
||||||
|
MapQuality(evt.AttributeChanged.Quality),
|
||||||
|
evt.AttributeChanged.Timestamp.ToDateTimeOffset()),
|
||||||
|
SiteStreamEvent.EventOneofCase.AlarmChanged => new AlarmStateChanged(
|
||||||
|
evt.AlarmChanged.InstanceUniqueName,
|
||||||
|
evt.AlarmChanged.AlarmName,
|
||||||
|
MapAlarmState(evt.AlarmChanged.State),
|
||||||
|
evt.AlarmChanged.Priority,
|
||||||
|
evt.AlarmChanged.Timestamp.ToDateTimeOffset()),
|
||||||
|
_ => null
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps proto Quality enum to domain string. Internal for testability.
|
||||||
|
/// </summary>
|
||||||
|
internal static string MapQuality(Quality quality) => quality switch
|
||||||
|
{
|
||||||
|
Quality.Good => "Good",
|
||||||
|
Quality.Uncertain => "Uncertain",
|
||||||
|
Quality.Bad => "Bad",
|
||||||
|
_ => "Unknown"
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps proto AlarmStateEnum to domain AlarmState. Internal for testability.
|
||||||
|
/// </summary>
|
||||||
|
internal static AlarmState MapAlarmState(AlarmStateEnum state) => state switch
|
||||||
|
{
|
||||||
|
AlarmStateEnum.AlarmStateNormal => AlarmState.Normal,
|
||||||
|
AlarmStateEnum.AlarmStateActive => AlarmState.Active,
|
||||||
|
_ => AlarmState.Normal
|
||||||
|
};
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
foreach (var cts in _subscriptions.Values)
|
||||||
|
{
|
||||||
|
cts.Cancel();
|
||||||
|
cts.Dispose();
|
||||||
|
}
|
||||||
|
_subscriptions.Clear();
|
||||||
|
|
||||||
|
if (_channel is not null)
|
||||||
|
_channel.Dispose();
|
||||||
|
|
||||||
|
await ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Caches one <see cref="SiteStreamGrpcClient"/> per site identifier.
|
||||||
|
/// The DebugStreamBridgeActor uses this factory to obtain (or create) a
|
||||||
|
/// gRPC client for a given site before opening a streaming subscription.
|
||||||
|
/// </summary>
|
||||||
|
public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<string, SiteStreamGrpcClient> _clients = new();
|
||||||
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
|
|
||||||
|
public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory)
|
||||||
|
{
|
||||||
|
_loggerFactory = loggerFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns an existing client for the site or creates a new one.
|
||||||
|
/// </summary>
|
||||||
|
public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
||||||
|
{
|
||||||
|
return _clients.GetOrAdd(siteIdentifier, _ =>
|
||||||
|
{
|
||||||
|
var logger = _loggerFactory.CreateLogger<SiteStreamGrpcClient>();
|
||||||
|
return new SiteStreamGrpcClient(grpcEndpoint, logger);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Removes and disposes the client for the given site.
|
||||||
|
/// </summary>
|
||||||
|
public async Task RemoveSiteAsync(string siteIdentifier)
|
||||||
|
{
|
||||||
|
if (_clients.TryRemove(siteIdentifier, out var client))
|
||||||
|
{
|
||||||
|
await client.DisposeAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
foreach (var client in _clients.Values)
|
||||||
|
{
|
||||||
|
await client.DisposeAsync();
|
||||||
|
}
|
||||||
|
_clients.Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,10 @@
|
|||||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<InternalsVisibleTo Include="ScadaLink.Communication.Tests" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
namespace ScadaLink.Communication;
|
namespace ScadaLink.Communication;
|
||||||
|
|
||||||
@@ -10,6 +11,7 @@ public static class ServiceCollectionExtensions
|
|||||||
.BindConfiguration("Communication");
|
.BindConfiguration("Communication");
|
||||||
|
|
||||||
services.AddSingleton<CommunicationService>();
|
services.AddSingleton<CommunicationService>();
|
||||||
|
services.AddSingleton<SiteStreamGrpcClientFactory>();
|
||||||
services.AddSingleton<DebugStreamService>();
|
services.AddSingleton<DebugStreamService>();
|
||||||
|
|
||||||
return services;
|
return services;
|
||||||
|
|||||||
@@ -0,0 +1,339 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using ScadaLink.Commons.Messages.DebugView;
|
||||||
|
using ScadaLink.Commons.Messages.Streaming;
|
||||||
|
using ScadaLink.Communication.Actors;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
|
||||||
|
/// </summary>
|
||||||
|
public class DebugStreamBridgeActorTests : TestKit
|
||||||
|
{
|
||||||
|
private const string SiteId = "site-alpha";
|
||||||
|
private const string InstanceName = "Site1.Pump01";
|
||||||
|
private const string GrpcNodeA = "http://localhost:5100";
|
||||||
|
private const string GrpcNodeB = "http://localhost:5200";
|
||||||
|
|
||||||
|
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
|
||||||
|
{
|
||||||
|
// Use a very short reconnect delay for testing
|
||||||
|
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record TestContext(
|
||||||
|
IActorRef BridgeActor,
|
||||||
|
TestProbe CommProbe,
|
||||||
|
MockSiteStreamGrpcClient MockGrpcClient,
|
||||||
|
List<object> ReceivedEvents,
|
||||||
|
bool[] TerminatedFlag);
|
||||||
|
|
||||||
|
private TestContext CreateBridgeActor()
|
||||||
|
{
|
||||||
|
var commProbe = CreateTestProbe();
|
||||||
|
var mockClient = new MockSiteStreamGrpcClient();
|
||||||
|
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
|
||||||
|
var events = new List<object>();
|
||||||
|
var terminated = new[] { false };
|
||||||
|
|
||||||
|
Action<object> onEvent = evt => { lock (events) { events.Add(evt); } };
|
||||||
|
Action onTerminated = () => terminated[0] = true;
|
||||||
|
|
||||||
|
var props = Props.Create(typeof(DebugStreamBridgeActor),
|
||||||
|
SiteId,
|
||||||
|
InstanceName,
|
||||||
|
"corr-1",
|
||||||
|
commProbe.Ref,
|
||||||
|
onEvent,
|
||||||
|
onTerminated,
|
||||||
|
factory,
|
||||||
|
GrpcNodeA,
|
||||||
|
GrpcNodeB);
|
||||||
|
|
||||||
|
var actor = Sys.ActorOf(props);
|
||||||
|
return new TestContext(actor, commProbe, mockClient, events, terminated);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
|
||||||
|
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
Assert.Equal(SiteId, envelope.SiteId);
|
||||||
|
Assert.IsType<SubscribeDebugViewRequest>(envelope.Message);
|
||||||
|
|
||||||
|
var req = (SubscribeDebugViewRequest)envelope.Message;
|
||||||
|
Assert.Equal(InstanceName, req.InstanceUniqueName);
|
||||||
|
Assert.Equal("corr-1", req.CorrelationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void On_Snapshot_Forwards_To_OnEvent_Callback()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
|
||||||
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
|
||||||
|
TimeSpan.FromSeconds(3));
|
||||||
|
lock (ctx.ReceivedEvents) { Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]); }
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void On_Snapshot_Opens_GrpcStream()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
var call = ctx.MockGrpcClient.SubscribeCalls[0];
|
||||||
|
Assert.Equal("corr-1", call.CorrelationId);
|
||||||
|
Assert.Equal(InstanceName, call.InstanceUniqueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// Simulate gRPC event arriving via the onEvent callback
|
||||||
|
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
|
||||||
|
|
||||||
|
// snapshot + attr change
|
||||||
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
||||||
|
TimeSpan.FromSeconds(3));
|
||||||
|
lock (ctx.ReceivedEvents) { Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]); }
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void On_GrpcError_Reconnects_To_Other_Node()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// Simulate gRPC error
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
|
||||||
|
|
||||||
|
// Should resubscribe
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
||||||
|
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void After_MaxRetries_Terminates()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
Watch(ctx.BridgeActor);
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// 4 consecutive errors: initial + 3 retries, then terminate
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Fourth error exceeds max retries
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
|
||||||
|
|
||||||
|
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
|
||||||
|
Assert.True(ctx.TerminatedFlag[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // subscribe
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
Watch(ctx.BridgeActor);
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
ctx.BridgeActor.Tell(new StopDebugStream());
|
||||||
|
|
||||||
|
// Should send ClusterClient unsubscribe
|
||||||
|
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
Assert.IsType<UnsubscribeDebugViewRequest>(envelope.Message);
|
||||||
|
|
||||||
|
// Should unsubscribe gRPC
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
|
||||||
|
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
|
||||||
|
|
||||||
|
// Should stop self
|
||||||
|
ExpectTerminated(ctx.BridgeActor);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DebugStreamTerminated_Stops_Actor_Idempotently()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
Watch(ctx.BridgeActor);
|
||||||
|
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
|
||||||
|
|
||||||
|
ExpectTerminated(ctx.BridgeActor);
|
||||||
|
Assert.True(ctx.TerminatedFlag[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Grpc_Error_Resets_RetryCount_On_Successful_Event()
|
||||||
|
{
|
||||||
|
var ctx = CreateBridgeActor();
|
||||||
|
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
|
||||||
|
|
||||||
|
var snapshot = new DebugViewSnapshot(
|
||||||
|
InstanceName,
|
||||||
|
new List<AttributeValueChanged>(),
|
||||||
|
new List<AlarmStateChanged>(),
|
||||||
|
DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
ctx.BridgeActor.Tell(snapshot);
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// First error → retry 1
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Simulate successful event (resets retry count)
|
||||||
|
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(attrChange);
|
||||||
|
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
|
||||||
|
TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// Now another 3 errors should be tolerated (retry count was reset)
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
|
||||||
|
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 5, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Still alive — 3 retries from the second failure point succeeded
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
|
||||||
|
/// </summary>
|
||||||
|
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
|
||||||
|
{
|
||||||
|
public List<MockSubscription> SubscribeCalls { get; } = new();
|
||||||
|
public List<string> UnsubscribedCorrelationIds { get; } = new();
|
||||||
|
|
||||||
|
private MockSiteStreamGrpcClient(bool _) : base() { }
|
||||||
|
|
||||||
|
public MockSiteStreamGrpcClient() : base()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task SubscribeAsync(
|
||||||
|
string correlationId,
|
||||||
|
string instanceUniqueName,
|
||||||
|
Action<object> onEvent,
|
||||||
|
Action<Exception> onError,
|
||||||
|
CancellationToken ct)
|
||||||
|
{
|
||||||
|
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
|
||||||
|
SubscribeCalls.Add(subscription);
|
||||||
|
|
||||||
|
// Return a task that completes when cancelled (simulates long-running stream)
|
||||||
|
var tcs = new TaskCompletionSource();
|
||||||
|
ct.Register(() => tcs.TrySetResult());
|
||||||
|
return tcs.Task;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void Unsubscribe(string correlationId)
|
||||||
|
{
|
||||||
|
UnsubscribedCorrelationIds.Add(correlationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal record MockSubscription(
|
||||||
|
string CorrelationId,
|
||||||
|
string InstanceUniqueName,
|
||||||
|
Action<object> OnEvent,
|
||||||
|
Action<Exception> OnError,
|
||||||
|
CancellationToken CancellationToken);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Factory that always returns the pre-configured mock client.
|
||||||
|
/// </summary>
|
||||||
|
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
|
||||||
|
{
|
||||||
|
private readonly MockSiteStreamGrpcClient _mockClient;
|
||||||
|
public List<string> RequestedEndpoints { get; } = new();
|
||||||
|
|
||||||
|
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
|
||||||
|
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
|
||||||
|
{
|
||||||
|
_mockClient = mockClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
|
||||||
|
{
|
||||||
|
RequestedEndpoints.Add(grpcEndpoint);
|
||||||
|
return _mockClient;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,66 @@
|
|||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
public class SiteStreamGrpcClientFactoryTests
|
||||||
|
{
|
||||||
|
private readonly ILoggerFactory _loggerFactory = NullLoggerFactory.Instance;
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetOrCreate_ReturnsSameClientForSameSite()
|
||||||
|
{
|
||||||
|
using var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||||
|
|
||||||
|
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||||
|
var client2 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||||
|
|
||||||
|
Assert.Same(client1, client2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetOrCreate_ReturnsDifferentClientsForDifferentSites()
|
||||||
|
{
|
||||||
|
using var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||||
|
|
||||||
|
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||||
|
var client2 = factory.GetOrCreate("site-b", "http://localhost:5200");
|
||||||
|
|
||||||
|
Assert.NotSame(client1, client2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RemoveSite_DisposesClient()
|
||||||
|
{
|
||||||
|
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||||
|
|
||||||
|
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||||
|
await factory.RemoveSiteAsync("site-a");
|
||||||
|
|
||||||
|
// After removal, GetOrCreate should return a new instance
|
||||||
|
var client2 = factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||||
|
Assert.NotSame(client1, client2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RemoveSite_NonExistent_DoesNotThrow()
|
||||||
|
{
|
||||||
|
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||||
|
await factory.RemoveSiteAsync("does-not-exist"); // Should not throw
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DisposeAsync_DisposesAllClients()
|
||||||
|
{
|
||||||
|
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
|
||||||
|
|
||||||
|
factory.GetOrCreate("site-a", "http://localhost:5100");
|
||||||
|
factory.GetOrCreate("site-b", "http://localhost:5200");
|
||||||
|
|
||||||
|
await factory.DisposeAsync();
|
||||||
|
|
||||||
|
// After dispose, creating new clients should work (new instances)
|
||||||
|
// This tests that Dispose doesn't throw
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,141 @@
|
|||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
using ScadaLink.Commons.Messages.Streaming;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests.Grpc;
|
||||||
|
|
||||||
|
public class SiteStreamGrpcClientTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void ConvertToDomainEvent_AttributeChanged_MapsCorrectly()
|
||||||
|
{
|
||||||
|
var ts = DateTimeOffset.UtcNow;
|
||||||
|
var evt = new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = "corr-1",
|
||||||
|
AttributeChanged = new AttributeValueUpdate
|
||||||
|
{
|
||||||
|
InstanceUniqueName = "Site1.Pump01",
|
||||||
|
AttributePath = "Modules.IO",
|
||||||
|
AttributeName = "Temperature",
|
||||||
|
Value = "42.5",
|
||||||
|
Quality = Quality.Good,
|
||||||
|
Timestamp = Timestamp.FromDateTimeOffset(ts)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||||
|
|
||||||
|
var attr = Assert.IsType<AttributeValueChanged>(result);
|
||||||
|
Assert.Equal("Site1.Pump01", attr.InstanceUniqueName);
|
||||||
|
Assert.Equal("Modules.IO", attr.AttributePath);
|
||||||
|
Assert.Equal("Temperature", attr.AttributeName);
|
||||||
|
Assert.Equal("42.5", attr.Value);
|
||||||
|
Assert.Equal("Good", attr.Quality);
|
||||||
|
Assert.Equal(ts, attr.Timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConvertToDomainEvent_AlarmChanged_MapsCorrectly()
|
||||||
|
{
|
||||||
|
var ts = DateTimeOffset.UtcNow;
|
||||||
|
var evt = new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = "corr-2",
|
||||||
|
AlarmChanged = new AlarmStateUpdate
|
||||||
|
{
|
||||||
|
InstanceUniqueName = "Site1.Motor01",
|
||||||
|
AlarmName = "OverTemp",
|
||||||
|
State = AlarmStateEnum.AlarmStateActive,
|
||||||
|
Priority = 3,
|
||||||
|
Timestamp = Timestamp.FromDateTimeOffset(ts)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||||
|
|
||||||
|
var alarm = Assert.IsType<AlarmStateChanged>(result);
|
||||||
|
Assert.Equal("Site1.Motor01", alarm.InstanceUniqueName);
|
||||||
|
Assert.Equal("OverTemp", alarm.AlarmName);
|
||||||
|
Assert.Equal(AlarmState.Active, alarm.State);
|
||||||
|
Assert.Equal(3, alarm.Priority);
|
||||||
|
Assert.Equal(ts, alarm.Timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConvertToDomainEvent_UnknownEvent_ReturnsNull()
|
||||||
|
{
|
||||||
|
var evt = new SiteStreamEvent
|
||||||
|
{
|
||||||
|
CorrelationId = "corr-3"
|
||||||
|
// No oneof case set
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
|
||||||
|
|
||||||
|
Assert.Null(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(Quality.Good, "Good")]
|
||||||
|
[InlineData(Quality.Uncertain, "Uncertain")]
|
||||||
|
[InlineData(Quality.Bad, "Bad")]
|
||||||
|
[InlineData(Quality.Unspecified, "Unknown")]
|
||||||
|
public void MapQuality_AllValues(Quality input, string expected)
|
||||||
|
{
|
||||||
|
var result = SiteStreamGrpcClient.MapQuality(input);
|
||||||
|
Assert.Equal(expected, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(AlarmStateEnum.AlarmStateNormal, AlarmState.Normal)]
|
||||||
|
[InlineData(AlarmStateEnum.AlarmStateActive, AlarmState.Active)]
|
||||||
|
[InlineData(AlarmStateEnum.AlarmStateUnspecified, AlarmState.Normal)]
|
||||||
|
public void MapAlarmState_AllValues(AlarmStateEnum input, AlarmState expected)
|
||||||
|
{
|
||||||
|
var result = SiteStreamGrpcClient.MapAlarmState(input);
|
||||||
|
Assert.Equal(expected, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Unsubscribe_CancelsSubscription()
|
||||||
|
{
|
||||||
|
// We can't easily test the full Subscribe flow without a real gRPC server,
|
||||||
|
// but we can test the Unsubscribe path by registering a CTS directly.
|
||||||
|
// Use the internal AddSubscription helper for testability.
|
||||||
|
var client = SiteStreamGrpcClient.CreateForTesting();
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
client.AddSubscriptionForTesting("corr-test", cts);
|
||||||
|
|
||||||
|
Assert.False(cts.IsCancellationRequested);
|
||||||
|
|
||||||
|
client.Unsubscribe("corr-test");
|
||||||
|
|
||||||
|
Assert.True(cts.IsCancellationRequested);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Unsubscribe_NonExistent_DoesNotThrow()
|
||||||
|
{
|
||||||
|
var client = SiteStreamGrpcClient.CreateForTesting();
|
||||||
|
client.Unsubscribe("does-not-exist"); // Should not throw
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DisposeAsync_CancelsAllSubscriptions()
|
||||||
|
{
|
||||||
|
var client = SiteStreamGrpcClient.CreateForTesting();
|
||||||
|
|
||||||
|
var cts1 = new CancellationTokenSource();
|
||||||
|
var cts2 = new CancellationTokenSource();
|
||||||
|
client.AddSubscriptionForTesting("corr-1", cts1);
|
||||||
|
client.AddSubscriptionForTesting("corr-2", cts2);
|
||||||
|
|
||||||
|
await client.DisposeAsync();
|
||||||
|
|
||||||
|
Assert.True(cts1.IsCancellationRequested);
|
||||||
|
Assert.True(cts2.IsCancellationRequested);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user