feat: update DebugStreamBridgeActor to use gRPC for streaming events

After receiving the initial snapshot via ClusterClient, the bridge actor
now opens a gRPC server-streaming subscription via SiteStreamGrpcClient
for ongoing AttributeValueChanged/AlarmStateChanged events. Adds NodeA/
NodeB failover with max 3 retries, retry count reset on successful event,
and IWithTimers-based reconnect scheduling.

- DebugStreamBridgeActor: gRPC stream after snapshot, reconnect state machine
- DebugStreamService: inject SiteStreamGrpcClientFactory, resolve gRPC addresses
- ServiceCollectionExtensions: register SiteStreamGrpcClientFactory singleton
- SiteStreamGrpcClient: make SubscribeAsync/Unsubscribe virtual for testability
- SiteStreamGrpcClientFactory: make GetOrCreate virtual for testability
- New test suite: DebugStreamBridgeActorTests (8 tests)
This commit is contained in:
Joseph Doherty
2026-03-21 12:14:24 -04:00
parent 25a6022f7b
commit 2cd43b6992
6 changed files with 503 additions and 16 deletions

View File

@@ -2,16 +2,18 @@ using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Persistent actor (one per active debug session) on the central side.
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
/// as the Sender), so the site's InstanceActor registers this actor as the debug subscriber.
/// Stream events flow back via Akka remoting and are forwarded to the consumer via callbacks.
/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
/// server-streaming subscription via SiteStreamGrpcClient for ongoing events.
/// Stream events are marshalled back to the actor via Self.Tell for thread safety.
/// </summary>
public class DebugStreamBridgeActor : ReceiveActor
public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _siteIdentifier;
@@ -20,6 +22,19 @@ public class DebugStreamBridgeActor : ReceiveActor
private readonly IActorRef _centralCommunicationActor;
private readonly Action<object> _onEvent;
private readonly Action _onTerminated;
private readonly SiteStreamGrpcClientFactory _grpcFactory;
private readonly string _grpcNodeAAddress;
private readonly string _grpcNodeBAddress;
private const int MaxRetries = 3;
private const string ReconnectTimerKey = "grpc-reconnect";
internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);
private int _retryCount;
private bool _useNodeA = true;
private bool _stopped;
private CancellationTokenSource? _grpcCts;
public ITimerScheduler Timers { get; set; } = null!;
public DebugStreamBridgeActor(
string siteIdentifier,
@@ -27,7 +42,10 @@ public class DebugStreamBridgeActor : ReceiveActor
string correlationId,
IActorRef centralCommunicationActor,
Action<object> onEvent,
Action onTerminated)
Action onTerminated,
SiteStreamGrpcClientFactory grpcFactory,
string grpcNodeAAddress,
string grpcNodeBAddress)
{
_siteIdentifier = siteIdentifier;
_instanceUniqueName = instanceUniqueName;
@@ -35,31 +53,58 @@ public class DebugStreamBridgeActor : ReceiveActor
_centralCommunicationActor = centralCommunicationActor;
_onEvent = onEvent;
_onTerminated = onTerminated;
_grpcFactory = grpcFactory;
_grpcNodeAAddress = grpcNodeAAddress;
_grpcNodeBAddress = grpcNodeBAddress;
// Initial snapshot response from the site
// Initial snapshot response from the site (via ClusterClient)
Receive<DebugViewSnapshot>(snapshot =>
{
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
_onEvent(snapshot);
OpenGrpcStream();
});
// Ongoing stream events from the site's InstanceActor
Receive<AttributeValueChanged>(changed => _onEvent(changed));
Receive<AlarmStateChanged>(changed => _onEvent(changed));
// Domain events arriving via Self.Tell from gRPC callback
Receive<AttributeValueChanged>(changed =>
{
_retryCount = 0; // Successful event resets retry count
_onEvent(changed);
});
Receive<AlarmStateChanged>(changed =>
{
_retryCount = 0;
_onEvent(changed);
});
// gRPC stream error — attempt reconnection
Receive<GrpcStreamError>(msg =>
{
_log.Warning("gRPC stream error for {0}: {1}", _instanceUniqueName, msg.Exception.Message);
HandleGrpcError();
});
// Scheduled reconnection
Receive<ReconnectGrpcStream>(_ => OpenGrpcStream());
// Consumer requests stop
Receive<StopDebugStream>(_ =>
{
_log.Info("Stopping debug stream for {0}", _instanceUniqueName);
CleanupGrpc();
SendUnsubscribe();
_stopped = true;
Context.Stop(Self);
});
// Site disconnected — CentralCommunicationActor notifies us
Receive<DebugStreamTerminated>(msg =>
{
if (_stopped) return; // Idempotent — gRPC error may arrive simultaneously
_log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId);
CleanupGrpc();
_stopped = true;
_onTerminated();
Context.Stop(Self);
});
@@ -69,7 +114,9 @@ public class DebugStreamBridgeActor : ReceiveActor
Receive<ReceiveTimeout>(_ =>
{
_log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName);
CleanupGrpc();
SendUnsubscribe();
_stopped = true;
_onTerminated();
Context.Stop(Self);
});
@@ -79,13 +126,88 @@ public class DebugStreamBridgeActor : ReceiveActor
{
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
// Send subscribe request via CentralCommunicationActor.
// THIS actor is the Sender, so the site's InstanceActor registers us as the subscriber.
// Send subscribe request via CentralCommunicationActor for the initial snapshot.
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
_centralCommunicationActor.Tell(envelope, Self);
}
protected override void PostStop()
{
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = null;
base.PostStop();
}
private void OpenGrpcStream()
{
if (_stopped) return;
var endpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress;
_log.Info("Opening gRPC stream for {0} to {1}", _instanceUniqueName, endpoint);
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = new CancellationTokenSource();
var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint);
var self = Self;
var ct = _grpcCts.Token;
// Launch as background task — onEvent and onError marshal back to actor via Tell
Task.Run(async () =>
{
await client.SubscribeAsync(
_correlationId,
_instanceUniqueName,
evt => self.Tell(evt),
ex => self.Tell(new GrpcStreamError(ex)),
ct);
}, ct);
}
private void HandleGrpcError()
{
if (_stopped) return;
_retryCount++;
if (_retryCount > MaxRetries)
{
_log.Error("gRPC stream for {0} exceeded max retries ({1}), terminating", _instanceUniqueName, MaxRetries);
CleanupGrpc();
_stopped = true;
_onTerminated();
Context.Stop(Self);
return;
}
// Flip to the other node
_useNodeA = !_useNodeA;
// First retry is immediate, subsequent retries use a short backoff
if (_retryCount == 1)
{
Self.Tell(new ReconnectGrpcStream());
}
else
{
Timers.StartSingleTimer(ReconnectTimerKey, new ReconnectGrpcStream(), ReconnectDelay);
}
}
private void CleanupGrpc()
{
_grpcCts?.Cancel();
_grpcCts?.Dispose();
_grpcCts = null;
var client = _grpcFactory.GetOrCreate(_siteIdentifier,
_useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress);
client.Unsubscribe(_correlationId);
}
private void SendUnsubscribe()
{
var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
@@ -98,3 +220,13 @@ public class DebugStreamBridgeActor : ReceiveActor
/// Message sent to a DebugStreamBridgeActor to stop the debug stream session.
/// </summary>
public record StopDebugStream;
/// <summary>
/// Internal message indicating a gRPC stream error occurred.
/// </summary>
internal record GrpcStreamError(Exception Exception);
/// <summary>
/// Internal message to trigger gRPC stream reconnection.
/// </summary>
internal record ReconnectGrpcStream;

View File

@@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication;
@@ -17,6 +18,7 @@ public class DebugStreamService
{
private readonly CommunicationService _communicationService;
private readonly IServiceProvider _serviceProvider;
private readonly SiteStreamGrpcClientFactory _grpcClientFactory;
private readonly ILogger<DebugStreamService> _logger;
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
private ActorSystem? _actorSystem;
@@ -24,10 +26,12 @@ public class DebugStreamService
public DebugStreamService(
CommunicationService communicationService,
IServiceProvider serviceProvider,
SiteStreamGrpcClientFactory grpcClientFactory,
ILogger<DebugStreamService> logger)
{
_communicationService = communicationService;
_serviceProvider = serviceProvider;
_grpcClientFactory = grpcClientFactory;
_logger = logger;
}
@@ -56,6 +60,8 @@ public class DebugStreamService
// Resolve instance → unique name + site
string instanceUniqueName;
string siteIdentifier;
string grpcNodeAAddress;
string grpcNodeBAddress;
using (var scope = _serviceProvider.CreateScope())
{
@@ -69,6 +75,10 @@ public class DebugStreamService
instanceUniqueName = instance.UniqueName;
siteIdentifier = site.SiteIdentifier;
grpcNodeAAddress = site.GrpcNodeAAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured.");
grpcNodeBAddress = site.GrpcNodeBAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured.");
}
var sessionId = Guid.NewGuid().ToString("N");
@@ -104,7 +114,10 @@ public class DebugStreamService
sessionId,
commActor,
onEventWrapper,
onTerminatedWrapper);
onTerminatedWrapper,
_grpcClientFactory,
grpcNodeAAddress,
grpcNodeBAddress);
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");

View File

@@ -36,9 +36,10 @@ public class SiteStreamGrpcClient : IAsyncDisposable
}
/// <summary>
/// Private constructor for unit testing without a real gRPC channel.
/// Protected constructor for unit testing without a real gRPC channel.
/// Allows subclassing for mock implementations.
/// </summary>
private SiteStreamGrpcClient()
protected SiteStreamGrpcClient()
{
}
@@ -62,7 +63,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable
/// The <paramref name="onEvent"/> callback delivers domain events, and
/// <paramref name="onError"/> lets the caller handle reconnection.
/// </summary>
public async Task SubscribeAsync(
public virtual async Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action<object> onEvent,
@@ -109,7 +110,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable
/// <summary>
/// Cancels an active subscription by correlation ID.
/// </summary>
public void Unsubscribe(string correlationId)
public virtual void Unsubscribe(string correlationId)
{
if (_subscriptions.TryRemove(correlationId, out var cts))
{

View File

@@ -21,7 +21,7 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable
/// <summary>
/// Returns an existing client for the site or creates a new one.
/// </summary>
public SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
return _clients.GetOrAdd(siteIdentifier, _ =>
{

View File

@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication;
@@ -10,6 +11,7 @@ public static class ServiceCollectionExtensions
.BindConfiguration("Communication");
services.AddSingleton<CommunicationService>();
services.AddSingleton<SiteStreamGrpcClientFactory>();
services.AddSingleton<DebugStreamService>();
return services;

View File

@@ -0,0 +1,339 @@
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Grpc;
/// <summary>
/// Tests for DebugStreamBridgeActor with gRPC streaming integration.
/// </summary>
public class DebugStreamBridgeActorTests : TestKit
{
private const string SiteId = "site-alpha";
private const string InstanceName = "Site1.Pump01";
private const string GrpcNodeA = "http://localhost:5100";
private const string GrpcNodeB = "http://localhost:5200";
public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG")
{
// Use a very short reconnect delay for testing
DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100);
}
private record TestContext(
IActorRef BridgeActor,
TestProbe CommProbe,
MockSiteStreamGrpcClient MockGrpcClient,
List<object> ReceivedEvents,
bool[] TerminatedFlag);
private TestContext CreateBridgeActor()
{
var commProbe = CreateTestProbe();
var mockClient = new MockSiteStreamGrpcClient();
var factory = new MockSiteStreamGrpcClientFactory(mockClient);
var events = new List<object>();
var terminated = new[] { false };
Action<object> onEvent = evt => { lock (events) { events.Add(evt); } };
Action onTerminated = () => terminated[0] = true;
var props = Props.Create(typeof(DebugStreamBridgeActor),
SiteId,
InstanceName,
"corr-1",
commProbe.Ref,
onEvent,
onTerminated,
factory,
GrpcNodeA,
GrpcNodeB);
var actor = Sys.ActorOf(props);
return new TestContext(actor, commProbe, mockClient, events, terminated);
}
[Fact]
public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient()
{
var ctx = CreateBridgeActor();
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
Assert.Equal(SiteId, envelope.SiteId);
Assert.IsType<SubscribeDebugViewRequest>(envelope.Message);
var req = (SubscribeDebugViewRequest)envelope.Message;
Assert.Equal(InstanceName, req.InstanceUniqueName);
Assert.Equal("corr-1", req.CorrelationId);
}
[Fact]
public void On_Snapshot_Forwards_To_OnEvent_Callback()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType<DebugViewSnapshot>(ctx.ReceivedEvents[0]); }
}
[Fact]
public void On_Snapshot_Opens_GrpcStream()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
var call = ctx.MockGrpcClient.SubscribeCalls[0];
Assert.Equal("corr-1", call.CorrelationId);
Assert.Equal(InstanceName, call.InstanceUniqueName);
}
[Fact]
public void Events_From_GrpcCallback_Forwarded_To_OnEvent()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC event arriving via the onEvent callback
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange);
// snapshot + attr change
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
lock (ctx.ReceivedEvents) { Assert.IsType<AttributeValueChanged>(ctx.ReceivedEvents[1]); }
}
[Fact]
public void On_GrpcError_Reconnects_To_Other_Node()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// Simulate gRPC error
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken"));
// Should resubscribe
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId);
}
[Fact]
public void After_MaxRetries_Terminates()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// 4 consecutive errors: initial + 3 retries, then terminate
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
// Fourth error exceeds max retries
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5));
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>(); // subscribe
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
ctx.BridgeActor.Tell(new StopDebugStream());
// Should send ClusterClient unsubscribe
var envelope = ctx.CommProbe.ExpectMsg<SiteEnvelope>();
Assert.IsType<UnsubscribeDebugViewRequest>(envelope.Message);
// Should unsubscribe gRPC
AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3));
Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds);
// Should stop self
ExpectTerminated(ctx.BridgeActor);
}
[Fact]
public void DebugStreamTerminated_Stops_Actor_Idempotently()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
Watch(ctx.BridgeActor);
ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1"));
ExpectTerminated(ctx.BridgeActor);
Assert.True(ctx.TerminatedFlag[0]);
}
[Fact]
public void Grpc_Error_Resets_RetryCount_On_Successful_Event()
{
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg<SiteEnvelope>();
var snapshot = new DebugViewSnapshot(
InstanceName,
new List<AttributeValueChanged>(),
new List<AlarmStateChanged>(),
DateTimeOffset.UtcNow);
ctx.BridgeActor.Tell(snapshot);
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
// First error → retry 1
ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
// Simulate successful event (resets retry count)
var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow);
ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(attrChange);
AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
TimeSpan.FromSeconds(3));
// Now another 3 errors should be tolerated (retry count was reset)
ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5));
ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4"));
AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 5, TimeSpan.FromSeconds(5));
// Still alive — 3 retries from the second failure point succeeded
}
}
/// <summary>
/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls.
/// </summary>
internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient
{
public List<MockSubscription> SubscribeCalls { get; } = new();
public List<string> UnsubscribedCorrelationIds { get; } = new();
private MockSiteStreamGrpcClient(bool _) : base() { }
public MockSiteStreamGrpcClient() : base()
{
}
public override Task SubscribeAsync(
string correlationId,
string instanceUniqueName,
Action<object> onEvent,
Action<Exception> onError,
CancellationToken ct)
{
var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct);
SubscribeCalls.Add(subscription);
// Return a task that completes when cancelled (simulates long-running stream)
var tcs = new TaskCompletionSource();
ct.Register(() => tcs.TrySetResult());
return tcs.Task;
}
public override void Unsubscribe(string correlationId)
{
UnsubscribedCorrelationIds.Add(correlationId);
}
}
internal record MockSubscription(
string CorrelationId,
string InstanceUniqueName,
Action<object> OnEvent,
Action<Exception> OnError,
CancellationToken CancellationToken);
/// <summary>
/// Factory that always returns the pre-configured mock client.
/// </summary>
internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory
{
private readonly MockSiteStreamGrpcClient _mockClient;
public List<string> RequestedEndpoints { get; } = new();
public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient)
: base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance)
{
_mockClient = mockClient;
}
public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint)
{
RequestedEndpoints.Add(grpcEndpoint);
return _mockClient;
}
}