Compare commits

...

2 Commits

Author SHA1 Message Date
Joseph Doherty
2cd43b6992 feat: update DebugStreamBridgeActor to use gRPC for streaming events
After receiving the initial snapshot via ClusterClient, the bridge actor
now opens a gRPC server-streaming subscription via SiteStreamGrpcClient
for ongoing AttributeValueChanged/AlarmStateChanged events. Adds NodeA/
NodeB failover with max 3 retries, retry count reset on successful event,
and IWithTimers-based reconnect scheduling.

- DebugStreamBridgeActor: gRPC stream after snapshot, reconnect state machine
- DebugStreamService: inject SiteStreamGrpcClientFactory, resolve gRPC addresses
- ServiceCollectionExtensions: register SiteStreamGrpcClientFactory singleton
- SiteStreamGrpcClient: make SubscribeAsync/Unsubscribe virtual for testability
- SiteStreamGrpcClientFactory: make GetOrCreate virtual for testability
- New test suite: DebugStreamBridgeActorTests (8 tests)
2026-03-21 12:14:24 -04:00
Joseph Doherty
25a6022f7b feat: add SiteStreamGrpcClient and SiteStreamGrpcClientFactory
Per-site gRPC client for central-side streaming subscriptions to site
servers. SiteStreamGrpcClient manages server-streaming calls with
keepalive, converts proto events to domain types, and supports
cancellation via Unsubscribe. SiteStreamGrpcClientFactory caches one
client per site identifier.

Includes InternalsVisibleTo for test access to conversion helpers and
comprehensive unit tests for event mapping, quality/alarm-state
conversion, unsubscribe behavior, and factory caching.
2026-03-21 12:06:38 -04:00
9 changed files with 944 additions and 11 deletions

View File

@@ -2,16 +2,18 @@ using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Persistent actor (one per active debug session) on the central side.
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
/// as the Sender), so the site's InstanceActor registers this actor as the debug subscriber.
/// Stream events flow back via Akka remoting and are forwarded to the consumer via callbacks.
/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
/// server-streaming subscription via SiteStreamGrpcClient for ongoing events.
/// Stream events are marshalled back to the actor via Self.Tell for thread safety.
/// </summary>
public class DebugStreamBridgeActor : ReceiveActor
public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _siteIdentifier;
@@ -20,6 +22,19 @@ public class DebugStreamBridgeActor : ReceiveActor
private readonly IActorRef _centralCommunicationActor;
private readonly Action<object> _onEvent;
private readonly Action _onTerminated;
private readonly SiteStreamGrpcClientFactory _grpcFactory;
private readonly string _grpcNodeAAddress;
private readonly string _grpcNodeBAddress;
private const int MaxRetries = 3;
private const string ReconnectTimerKey = "grpc-reconnect";
internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);
private int _retryCount;
private bool _useNodeA = true;
private bool _stopped;
private CancellationTokenSource? _grpcCts;
public ITimerScheduler Timers { get; set; } = null!;
public DebugStreamBridgeActor(
string siteIdentifier,
@@ -27,7 +42,10 @@ public class DebugStreamBridgeActor : ReceiveActor
string correlationId,
IActorRef centralCommunicationActor,
Action<object> onEvent,
Action onTerminated)
Action onTerminated,
SiteStreamGrpcClientFactory grpcFactory,
string grpcNodeAAddress,
string grpcNodeBAddress)
{
_siteIdentifier = siteIdentifier;
_instanceUniqueName = instanceUniqueName;
@@ -35,31 +53,58 @@ public class DebugStreamBridgeActor : ReceiveActor
_centralCommunicationActor = centralCommunicationActor;
_onEvent = onEvent;
_onTerminated = onTerminated;
_grpcFactory = grpcFactory;
_grpcNodeAAddress = grpcNodeAAddress;
_grpcNodeBAddress = grpcNodeBAddress;
// Initial snapshot response from the site
// Initial snapshot response from the site (via ClusterClient)
Receive<DebugViewSnapshot>(snapshot =>
{
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
_onEvent(snapshot);
OpenGrpcStream();
});
// Ongoing stream events from the site's InstanceActor
Receive<AttributeValueChanged>(changed => _onEvent(changed));
Receive<AlarmStateChanged>(changed => _onEvent(changed));
// Domain events arriving via Self.Tell from gRPC callback
Receive<AttributeValueChanged>(changed =>
{
_retryCount = 0; // Successful event resets retry count
_onEvent(changed);
});
Receive<AlarmStateChanged>(changed =>
{
_retryCount = 0;
_onEvent(changed);
});
// gRPC stream error — attempt reconnection
Receive<GrpcStreamError>(msg =>
{
_log.Warning("gRPC stream error for {0}: {1}", _instanceUniqueName, msg.Exception.Message);
HandleGrpcError();
});
// Scheduled reconnection
Receive<ReconnectGrpcStream>(_ => OpenGrpcStream());
// Consumer requests stop
Receive<StopDebugStream>(_ =>
{
_log.Info("Stopping debug stream for {0}", _instanceUniqueName);
CleanupGrpc();
SendUnsubscribe();
_stopped = true;
Context.Stop(Self);
});
// Site disconnected — CentralCommunicationActor notifies us
Receive<DebugStreamTerminated>(msg =>
{
if (_stopped) return; // Idempotent — gRPC error may arrive simultaneously
_log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId);
CleanupGrpc();
_stopped = true;
_onTerminated();
Context.Stop(Self);
});
@@ -69,7 +114,9 @@ public class DebugStreamBridgeActor : ReceiveActor
Receive<ReceiveTimeout>(_ =>
{
_log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName);
CleanupGrpc();
SendUnsubscribe();
_stopped = true;
_onTerminated();
Context.Stop(Self);
});
@@ -79,13 +126,88 @@ public class DebugStreamBridgeActor : ReceiveActor
{
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
// Send subscribe request via CentralCommunicationActor.
// THIS actor is the Sender, so the site's InstanceActor registers us as the subscriber.
// Send subscribe request via CentralCommunicationActor for the initial snapshot.
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
_centralCommunicationActor.Tell(envelope, Self);
}
protected override void PostStop()
{
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = null;
base.PostStop();
}
private void OpenGrpcStream()
{
if (_stopped) return;
var endpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress;
_log.Info("Opening gRPC stream for {0} to {1}", _instanceUniqueName, endpoint);
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = new CancellationTokenSource();
var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint);
var self = Self;
var ct = _grpcCts.Token;
// Launch as background task — onEvent and onError marshal back to actor via Tell
Task.Run(async () =>
{
await client.SubscribeAsync(
_correlationId,
_instanceUniqueName,
evt => self.Tell(evt),
ex => self.Tell(new GrpcStreamError(ex)),
ct);
}, ct);
}
private void HandleGrpcError()
{
if (_stopped) return;
_retryCount++;
if (_retryCount > MaxRetries)
{
_log.Error("gRPC stream for {0} exceeded max retries ({1}), terminating", _instanceUniqueName, MaxRetries);
CleanupGrpc();
_stopped = true;
_onTerminated();
Context.Stop(Self);
return;
}
// Flip to the other node
_useNodeA = !_useNodeA;
// First retry is immediate, subsequent retries use a short backoff
if (_retryCount == 1)
{
Self.Tell(new ReconnectGrpcStream());
}
else
{
Timers.StartSingleTimer(ReconnectTimerKey, new ReconnectGrpcStream(), ReconnectDelay);
}
}
private void CleanupGrpc()
{
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = null;
var client = _grpcFactory.GetOrCreate(_siteIdentifier,
_useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress);
client.Unsubscribe(_correlationId);
}
private void SendUnsubscribe()
{
var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
@@ -98,3 +220,13 @@ public class DebugStreamBridgeActor : ReceiveActor
/// Message sent to a DebugStreamBridgeActor to stop the debug stream session.
/// </summary>
public record StopDebugStream;
/// <summary>
/// Internal message indicating a gRPC stream error occurred.
/// </summary>
internal record GrpcStreamError(Exception Exception);
/// <summary>
/// Internal message to trigger gRPC stream reconnection.
/// </summary>
internal record ReconnectGrpcStream;

View File

@@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication;
@@ -17,6 +18,7 @@ public class DebugStreamService
{
private readonly CommunicationService _communicationService;
private readonly IServiceProvider _serviceProvider;
private readonly SiteStreamGrpcClientFactory _grpcClientFactory;
private readonly ILogger<DebugStreamService> _logger;
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
private ActorSystem? _actorSystem;
@@ -24,10 +26,12 @@ public class DebugStreamService
public DebugStreamService(
CommunicationService communicationService,
IServiceProvider serviceProvider,
SiteStreamGrpcClientFactory grpcClientFactory,
ILogger<DebugStreamService> logger)
{
_communicationService = communicationService;
_serviceProvider = serviceProvider;
_grpcClientFactory = grpcClientFactory;
_logger = logger;
}
@@ -56,6 +60,8 @@ public class DebugStreamService
// Resolve instance → unique name + site
string instanceUniqueName;
string siteIdentifier;
string grpcNodeAAddress;
string grpcNodeBAddress;
using (var scope = _serviceProvider.CreateScope())
{
@@ -69,6 +75,10 @@ public class DebugStreamService
instanceUniqueName = instance.UniqueName;
siteIdentifier = site.SiteIdentifier;
grpcNodeAAddress = site.GrpcNodeAAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured.");
grpcNodeBAddress = site.GrpcNodeBAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured.");
}
var sessionId = Guid.NewGuid().ToString("N");
@@ -104,7 +114,10 @@ public class DebugStreamService
sessionId,
commActor,
onEventWrapper,
onTerminatedWrapper);
onTerminatedWrapper,
_grpcClientFactory,
grpcNodeAAddress,
grpcNodeBAddress);
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");

View File

@@ -0,0 +1,179 @@
using System.Collections.Concurrent;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Commons.Types.Enums;
using Google.Protobuf.WellKnownTypes;
namespace ScadaLink.Communication.Grpc;
/// <summary>
/// Per-site gRPC client that manages streaming subscriptions to a site's
/// SiteStreamGrpcServer. The central-side DebugStreamBridgeActor uses this
/// to open server-streaming calls for individual instances.
/// </summary>
public class SiteStreamGrpcClient : IAsyncDisposable
{
private readonly GrpcChannel? _channel;
private readonly SiteStreamService.SiteStreamServiceClient? _client;
private readonly ILogger? _logger;
private readonly ConcurrentDictionary<string, CancellationTokenSource> _subscriptions = new();
public SiteStreamGrpcClient(string endpoint, ILogger logger)
{
_channel = GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
KeepAlivePingDelay = TimeSpan.FromSeconds(15),
KeepAlivePingTimeout = TimeSpan.FromSeconds(10),
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always
}
});
_client = new SiteStreamService.SiteStreamServiceClient(_channel);
_logger = logger;
}
/// <summary>
/// Protected constructor for unit testing without a real gRPC channel.
/// Allows subclassing for mock implementations.
/// </summary>
protected SiteStreamGrpcClient()
{
}
/// <summary>
/// Creates a test-only instance that has no gRPC channel. Used to test
/// Unsubscribe and Dispose behavior without needing a real endpoint.
/// </summary>
internal static SiteStreamGrpcClient CreateForTesting() => new();
/// <summary>
/// Registers a CancellationTokenSource for a correlation ID. Test-only.
/// </summary>
internal void AddSubscriptionForTesting(string correlationId, CancellationTokenSource cts)
{
_subscriptions[correlationId] = cts;
}
/// <summary>
/// Opens a server-streaming subscription for a specific instance.
/// This is a long-running async method; the caller launches it as a background task.
/// The <paramref name="onEvent"/> callback delivers domain events, and
/// <paramref name="onError"/> lets the caller handle reconnection.
/// </summary>
public virtual async Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action<object> onEvent,
Action<Exception> onError,
CancellationToken ct)
{
if (_client is null)
throw new InvalidOperationException("Cannot subscribe on a test-only client.");
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
_subscriptions[correlationId] = cts;
var request = new InstanceStreamRequest
{
CorrelationId = correlationId,
InstanceUniqueName = instanceUniqueName
};
try
{
using var call = _client.SubscribeInstance(request, cancellationToken: cts.Token);
await foreach (var evt in call.ResponseStream.ReadAllAsync(cts.Token))
{
var domainEvent = ConvertToDomainEvent(evt);
if (domainEvent != null)
onEvent(domainEvent);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Normal cancellation — not an error
}
catch (Exception ex)
{
onError(ex);
}
finally
{
_subscriptions.TryRemove(correlationId, out _);
}
}
/// <summary>
/// Cancels an active subscription by correlation ID.
/// </summary>
public virtual void Unsubscribe(string correlationId)
{
if (_subscriptions.TryRemove(correlationId, out var cts))
{
cts.Cancel();
cts.Dispose();
}
}
/// <summary>
/// Converts a proto SiteStreamEvent to the corresponding domain message.
/// Internal for testability.
/// </summary>
internal static object? ConvertToDomainEvent(SiteStreamEvent evt) => evt.EventCase switch
{
SiteStreamEvent.EventOneofCase.AttributeChanged => new AttributeValueChanged(
evt.AttributeChanged.InstanceUniqueName,
evt.AttributeChanged.AttributePath,
evt.AttributeChanged.AttributeName,
evt.AttributeChanged.Value,
MapQuality(evt.AttributeChanged.Quality),
evt.AttributeChanged.Timestamp.ToDateTimeOffset()),
SiteStreamEvent.EventOneofCase.AlarmChanged => new AlarmStateChanged(
evt.AlarmChanged.InstanceUniqueName,
evt.AlarmChanged.AlarmName,
MapAlarmState(evt.AlarmChanged.State),
evt.AlarmChanged.Priority,
evt.AlarmChanged.Timestamp.ToDateTimeOffset()),
_ => null
};
/// <summary>
/// Maps proto Quality enum to domain string. Internal for testability.
/// </summary>
internal static string MapQuality(Quality quality) => quality switch
{
Quality.Good => "Good",
Quality.Uncertain => "Uncertain",
Quality.Bad => "Bad",
_ => "Unknown"
};
/// <summary>
/// Maps proto AlarmStateEnum to domain AlarmState. Internal for testability.
/// </summary>
internal static AlarmState MapAlarmState(AlarmStateEnum state) => state switch
{
AlarmStateEnum.AlarmStateNormal => AlarmState.Normal,
AlarmStateEnum.AlarmStateActive => AlarmState.Active,
_ => AlarmState.Normal
};
public async ValueTask DisposeAsync()
{
foreach (var cts in _subscriptions.Values)
{
cts.Cancel();
cts.Dispose();
}
_subscriptions.Clear();
if (_channel is not null)
_channel.Dispose();
await ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,57 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
namespace ScadaLink.Communication.Grpc;
/// <summary>
/// Caches one <see cref="SiteStreamGrpcClient"/> per site identifier.
/// The DebugStreamBridgeActor uses this factory to obtain (or create) a
/// gRPC client for a given site before opening a streaming subscription.
/// </summary>
public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
{
private readonly ConcurrentDictionary<string, SiteStreamGrpcClient> _clients = new();
private readonly ILoggerFactory _loggerFactory;
public SiteStreamGrpcClientFactory(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
/// <summary>
/// Returns an existing client for the site or creates a new one.
/// </summary>
public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
return _clients.GetOrAdd(siteIdentifier, _ =>
{
var logger = _loggerFactory.CreateLogger<SiteStreamGrpcClient>();
return new SiteStreamGrpcClient(grpcEndpoint, logger);
});
}
/// <summary>
/// Removes and disposes the client for the given site.
/// </summary>
public async Task RemoveSiteAsync(string siteIdentifier)
{
if (_clients.TryRemove(siteIdentifier, out var client))
{
await client.DisposeAsync();
}
}
public async ValueTask DisposeAsync()
{
foreach (var client in _clients.Values)
{
await client.DisposeAsync();
}
_clients.Clear();
}
public void Dispose()
{
DisposeAsync().AsTask().GetAwaiter().GetResult();
}
}

View File

@@ -7,6 +7,10 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="ScadaLink.Communication.Tests" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

View File

@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication;
@@ -10,6 +11,7 @@ public static class ServiceCollectionExtensions
.BindConfiguration("Communication");
services.AddSingleton<CommunicationService>();
services.AddSingleton<SiteStreamGrpcClientFactory>();
services.AddSingleton<DebugStreamService>();
return services;

View File

@@ -0,0 +1,339 @@
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
/// <summary>
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
/// </summary>
public class DebugStreamBridgeActorTests : TestKit
{
private const string SiteId = "site-alpha";
private const string InstanceName = "Site1.Pump01";
private const string GrpcNodeA = "http://localhost:5100";
private const string GrpcNodeB = "http://localhost:5200";
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
{
// Use a very short reconnect delay for testing
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
}
private record TestContext(
IActorRef BridgeActor,
TestProbe CommProbe,
MockSiteStreamGrpcClient MockGrpcClient,
List<object> ReceivedEvents,
bool[] TerminatedFlag);
private TestContext CreateBridgeActor()
{
var commProbe = CreateTestProbe();
var mockClient = new MockSiteStreamGrpcClient();
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
var events = new List<object>();
var terminated = new[] { false };
Action<object> onEvent = evt => { lock (events) { events.Add(evt); } };
Action onTerminated = () => terminated[0] = true;
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId,
InstanceName,
"corr-1",
commProbe.Ref,
onEvent,
onTerminated,
factory,
GrpcNodeA,
GrpcNodeB);
var actor = Sys.ActorOf(props);
return new TestContext(actor, commProbe, mockClient, events, terminated);
}
[Fact]
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
{
var ctx = CreateBridgeActor();
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
Assert.Equal(SiteId, envelope.SiteId);
Assert.IsType<SubscribeDebugViewRequest>(envelope.Message);
var req = (SubscribeDebugViewRequest)envelope.Message;
Assert.Equal(InstanceName, req.InstanceUniqueName);
Assert.Equal("corr-1", req.CorrelationId);
}
[Fact]
public void On_Snapshot_Forwards_To_OnEvent_Callback()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]); }
}
[Fact]
public void On_Snapshot_Opens_GrpcStream()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var call = ctx.MockGrpcClient.SubscribeCalls[0];
Assert.Equal("corr-1", call.CorrelationId);
Assert.Equal(InstanceName, call.InstanceUniqueName);
}
[Fact]
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC event arriving via the onEvent callback
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
// snapshot + attr change
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]); }
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
}
[Fact]
public void After_MaxRetries_Terminates()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// 4 consecutive errors: initial + 3 retries, then terminate
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
// Fourth error exceeds max retries
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // subscribe
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
ctx.BridgeActor.Tell(new StopDebugStream());
// Should send ClusterClient unsubscribe
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
Assert.IsType<UnsubscribeDebugViewRequest>(envelope.Message);
// Should unsubscribe gRPC
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
// Should stop self
ExpectTerminated(ctx.BridgeActor);
}
[Fact]
public void DebugStreamTerminated_Stops_Actor_Idempotently()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
ExpectTerminated(ctx.BridgeActor);
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void Grpc_Error_Resets_RetryCount_On_Successful_Event()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// First error → retry 1
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// Simulate successful event (resets retry count)
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(attrChange);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
// Now another 3 errors should be tolerated (retry count was reset)
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 5, TimeSpan.FromSeconds(5));
// Still alive — 3 retries from the second failure point succeeded
}
}
/// <summary>
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
/// </summary>
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
{
public List<MockSubscription> SubscribeCalls { get; } = new();
public List<string> UnsubscribedCorrelationIds { get; } = new();
private MockSiteStreamGrpcClient(bool _) : base() { }
public MockSiteStreamGrpcClient() : base()
{
}
public override Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action<object> onEvent,
Action<Exception> onError,
CancellationToken ct)
{
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
SubscribeCalls.Add(subscription);
// Return a task that completes when cancelled (simulates long-running stream)
var tcs = new TaskCompletionSource();
ct.Register(() => tcs.TrySetResult());
return tcs.Task;
}
public override void Unsubscribe(string correlationId)
{
UnsubscribedCorrelationIds.Add(correlationId);
}
}
internal record MockSubscription(
string CorrelationId,
string InstanceUniqueName,
Action<object> OnEvent,
Action<Exception> OnError,
CancellationToken CancellationToken);
/// <summary>
/// Factory that always returns the pre-configured mock client.
/// </summary>
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly MockSiteStreamGrpcClient _mockClient;
public List<string> RequestedEndpoints { get; } = new();
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
_mockClient = mockClient;
}
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
RequestedEndpoints.Add(grpcEndpoint);
return _mockClient;
}
}

View File

@@ -0,0 +1,66 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
public class SiteStreamGrpcClientFactoryTests
{
private readonly ILoggerFactory _loggerFactory = NullLoggerFactory.Instance;
[Fact]
public void GetOrCreate_ReturnsSameClientForSameSite()
{
using var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
var client2 = factory.GetOrCreate("site-a", "http://localhost:5100");
Assert.Same(client1, client2);
}
[Fact]
public void GetOrCreate_ReturnsDifferentClientsForDifferentSites()
{
using var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
var client2 = factory.GetOrCreate("site-b", "http://localhost:5200");
Assert.NotSame(client1, client2);
}
[Fact]
public async Task RemoveSite_DisposesClient()
{
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
var client1 = factory.GetOrCreate("site-a", "http://localhost:5100");
await factory.RemoveSiteAsync("site-a");
// After removal, GetOrCreate should return a new instance
var client2 = factory.GetOrCreate("site-a", "http://localhost:5100");
Assert.NotSame(client1, client2);
}
[Fact]
public async Task RemoveSite_NonExistent_DoesNotThrow()
{
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
await factory.RemoveSiteAsync("does-not-exist"); // Should not throw
}
[Fact]
public async Task DisposeAsync_DisposesAllClients()
{
var factory = new SiteStreamGrpcClientFactory(_loggerFactory);
factory.GetOrCreate("site-a", "http://localhost:5100");
factory.GetOrCreate("site-b", "http://localhost:5200");
await factory.DisposeAsync();
// After dispose, creating new clients should work (new instances)
// This tests that Dispose doesn't throw
}
}

View File

@@ -0,0 +1,141 @@
using Google.Protobuf.WellKnownTypes;
using ScadaLink.Communication.Grpc;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.Communication.Tests.Grpc;
public class SiteStreamGrpcClientTests
{
[Fact]
public void ConvertToDomainEvent_AttributeChanged_MapsCorrectly()
{
var ts = DateTimeOffset.UtcNow;
var evt = new SiteStreamEvent
{
CorrelationId = "corr-1",
AttributeChanged = new AttributeValueUpdate
{
InstanceUniqueName = "Site1.Pump01",
AttributePath = "Modules.IO",
AttributeName = "Temperature",
Value = "42.5",
Quality = Quality.Good,
Timestamp = Timestamp.FromDateTimeOffset(ts)
}
};
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
var attr = Assert.IsType<AttributeValueChanged>(result);
Assert.Equal("Site1.Pump01", attr.InstanceUniqueName);
Assert.Equal("Modules.IO", attr.AttributePath);
Assert.Equal("Temperature", attr.AttributeName);
Assert.Equal("42.5", attr.Value);
Assert.Equal("Good", attr.Quality);
Assert.Equal(ts, attr.Timestamp);
}
[Fact]
public void ConvertToDomainEvent_AlarmChanged_MapsCorrectly()
{
var ts = DateTimeOffset.UtcNow;
var evt = new SiteStreamEvent
{
CorrelationId = "corr-2",
AlarmChanged = new AlarmStateUpdate
{
InstanceUniqueName = "Site1.Motor01",
AlarmName = "OverTemp",
State = AlarmStateEnum.AlarmStateActive,
Priority = 3,
Timestamp = Timestamp.FromDateTimeOffset(ts)
}
};
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
var alarm = Assert.IsType<AlarmStateChanged>(result);
Assert.Equal("Site1.Motor01", alarm.InstanceUniqueName);
Assert.Equal("OverTemp", alarm.AlarmName);
Assert.Equal(AlarmState.Active, alarm.State);
Assert.Equal(3, alarm.Priority);
Assert.Equal(ts, alarm.Timestamp);
}
[Fact]
public void ConvertToDomainEvent_UnknownEvent_ReturnsNull()
{
var evt = new SiteStreamEvent
{
CorrelationId = "corr-3"
// No oneof case set
};
var result = SiteStreamGrpcClient.ConvertToDomainEvent(evt);
Assert.Null(result);
}
[Theory]
[InlineData(Quality.Good, "Good")]
[InlineData(Quality.Uncertain, "Uncertain")]
[InlineData(Quality.Bad, "Bad")]
[InlineData(Quality.Unspecified, "Unknown")]
public void MapQuality_AllValues(Quality input, string expected)
{
var result = SiteStreamGrpcClient.MapQuality(input);
Assert.Equal(expected, result);
}
[Theory]
[InlineData(AlarmStateEnum.AlarmStateNormal, AlarmState.Normal)]
[InlineData(AlarmStateEnum.AlarmStateActive, AlarmState.Active)]
[InlineData(AlarmStateEnum.AlarmStateUnspecified, AlarmState.Normal)]
public void MapAlarmState_AllValues(AlarmStateEnum input, AlarmState expected)
{
var result = SiteStreamGrpcClient.MapAlarmState(input);
Assert.Equal(expected, result);
}
[Fact]
public void Unsubscribe_CancelsSubscription()
{
// We can't easily test the full Subscribe flow without a real gRPC server,
// but we can test the Unsubscribe path by registering a CTS directly.
// Use the internal AddSubscription helper for testability.
var client = SiteStreamGrpcClient.CreateForTesting();
var cts = new CancellationTokenSource();
client.AddSubscriptionForTesting("corr-test", cts);
Assert.False(cts.IsCancellationRequested);
client.Unsubscribe("corr-test");
Assert.True(cts.IsCancellationRequested);
}
[Fact]
public void Unsubscribe_NonExistent_DoesNotThrow()
{
var client = SiteStreamGrpcClient.CreateForTesting();
client.Unsubscribe("does-not-exist"); // Should not throw
}
[Fact]
public async Task DisposeAsync_CancelsAllSubscriptions()
{
var client = SiteStreamGrpcClient.CreateForTesting();
var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();
client.AddSubscriptionForTesting("corr-1", cts1);
client.AddSubscriptionForTesting("corr-2", cts2);
await client.DisposeAsync();
Assert.True(cts1.IsCancellationRequested);
Assert.True(cts2.IsCancellationRequested);
}
}