diff --git a/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs b/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs index 377e140..5e14aab 100644 --- a/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs +++ b/src/ScadaLink.Communication/Actors/DebugStreamBridgeActor.cs @@ -2,16 +2,18 @@ using Akka.Actor; using Akka.Event; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Streaming; +using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Actors; /// /// Persistent actor (one per active debug session) on the central side. /// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor -/// as the Sender), so the site's InstanceActor registers this actor as the debug subscriber. -/// Stream events flow back via Akka remoting and are forwarded to the consumer via callbacks. +/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC +/// server-streaming subscription via SiteStreamGrpcClient for ongoing events. +/// Stream events are marshalled back to the actor via Self.Tell for thread safety. /// -public class DebugStreamBridgeActor : ReceiveActor +public class DebugStreamBridgeActor : ReceiveActor, IWithTimers { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly string _siteIdentifier; @@ -20,6 +22,19 @@ public class DebugStreamBridgeActor : ReceiveActor private readonly IActorRef _centralCommunicationActor; private readonly Action _onEvent; private readonly Action _onTerminated; + private readonly SiteStreamGrpcClientFactory _grpcFactory; + private readonly string _grpcNodeAAddress; + private readonly string _grpcNodeBAddress; + + private const int MaxRetries = 3; + private const string ReconnectTimerKey = "grpc-reconnect"; + internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); + private int _retryCount; + private bool _useNodeA = true; + private bool _stopped; + private CancellationTokenSource? _grpcCts; + + public ITimerScheduler Timers { get; set; } = null!; public DebugStreamBridgeActor( string siteIdentifier, @@ -27,7 +42,10 @@ public class DebugStreamBridgeActor : ReceiveActor string correlationId, IActorRef centralCommunicationActor, Action onEvent, - Action onTerminated) + Action onTerminated, + SiteStreamGrpcClientFactory grpcFactory, + string grpcNodeAAddress, + string grpcNodeBAddress) { _siteIdentifier = siteIdentifier; _instanceUniqueName = instanceUniqueName; @@ -35,31 +53,58 @@ public class DebugStreamBridgeActor : ReceiveActor _centralCommunicationActor = centralCommunicationActor; _onEvent = onEvent; _onTerminated = onTerminated; + _grpcFactory = grpcFactory; + _grpcNodeAAddress = grpcNodeAAddress; + _grpcNodeBAddress = grpcNodeBAddress; - // Initial snapshot response from the site + // Initial snapshot response from the site (via ClusterClient) Receive(snapshot => { _log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)", _instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count); _onEvent(snapshot); + OpenGrpcStream(); }); - // Ongoing stream events from the site's InstanceActor - Receive(changed => _onEvent(changed)); - Receive(changed => _onEvent(changed)); + // Domain events arriving via Self.Tell from gRPC callback + Receive(changed => + { + _retryCount = 0; // Successful event resets retry count + _onEvent(changed); + }); + Receive(changed => + { + _retryCount = 0; + _onEvent(changed); + }); + + // gRPC stream error — attempt reconnection + Receive(msg => + { + _log.Warning("gRPC stream error for {0}: {1}", _instanceUniqueName, msg.Exception.Message); + HandleGrpcError(); + }); + + // Scheduled reconnection + Receive(_ => OpenGrpcStream()); // Consumer requests stop Receive(_ => { _log.Info("Stopping debug stream for {0}", _instanceUniqueName); + CleanupGrpc(); SendUnsubscribe(); + _stopped = true; Context.Stop(Self); }); // Site disconnected — CentralCommunicationActor notifies us Receive(msg => { + if (_stopped) return; // Idempotent — gRPC error may arrive simultaneously _log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId); + CleanupGrpc(); + _stopped = true; _onTerminated(); Context.Stop(Self); }); @@ -69,7 +114,9 @@ public class DebugStreamBridgeActor : ReceiveActor Receive(_ => { _log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName); + CleanupGrpc(); SendUnsubscribe(); + _stopped = true; _onTerminated(); Context.Stop(Self); }); @@ -79,13 +126,88 @@ public class DebugStreamBridgeActor : ReceiveActor { _log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier); - // Send subscribe request via CentralCommunicationActor. - // THIS actor is the Sender, so the site's InstanceActor registers us as the subscriber. + // Send subscribe request via CentralCommunicationActor for the initial snapshot. var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId); var envelope = new SiteEnvelope(_siteIdentifier, request); _centralCommunicationActor.Tell(envelope, Self); } + protected override void PostStop() + { + _grpcCts?.Cancel(); + _grpcCts?.Dispose(); + _grpcCts = null; + base.PostStop(); + } + + private void OpenGrpcStream() + { + if (_stopped) return; + + var endpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress; + _log.Info("Opening gRPC stream for {0} to {1}", _instanceUniqueName, endpoint); + + _grpcCts?.Cancel(); + _grpcCts?.Dispose(); + _grpcCts = new CancellationTokenSource(); + + var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint); + var self = Self; + var ct = _grpcCts.Token; + + // Launch as background task — onEvent and onError marshal back to actor via Tell + Task.Run(async () => + { + await client.SubscribeAsync( + _correlationId, + _instanceUniqueName, + evt => self.Tell(evt), + ex => self.Tell(new GrpcStreamError(ex)), + ct); + }, ct); + } + + private void HandleGrpcError() + { + if (_stopped) return; + + _retryCount++; + + if (_retryCount > MaxRetries) + { + _log.Error("gRPC stream for {0} exceeded max retries ({1}), terminating", _instanceUniqueName, MaxRetries); + CleanupGrpc(); + _stopped = true; + _onTerminated(); + Context.Stop(Self); + return; + } + + // Flip to the other node + _useNodeA = !_useNodeA; + + // First retry is immediate, subsequent retries use a short backoff + if (_retryCount == 1) + { + Self.Tell(new ReconnectGrpcStream()); + } + else + { + Timers.StartSingleTimer(ReconnectTimerKey, new ReconnectGrpcStream(), ReconnectDelay); + } + } + + private void CleanupGrpc() + { + _grpcCts?.Cancel(); + _grpcCts?.Dispose(); + _grpcCts = null; + + var client = _grpcFactory.GetOrCreate(_siteIdentifier, + _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress); + client.Unsubscribe(_correlationId); + } + private void SendUnsubscribe() { var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId); @@ -98,3 +220,13 @@ public class DebugStreamBridgeActor : ReceiveActor /// Message sent to a DebugStreamBridgeActor to stop the debug stream session. /// public record StopDebugStream; + +/// +/// Internal message indicating a gRPC stream error occurred. +/// +internal record GrpcStreamError(Exception Exception); + +/// +/// Internal message to trigger gRPC stream reconnection. +/// +internal record ReconnectGrpcStream; diff --git a/src/ScadaLink.Communication/DebugStreamService.cs b/src/ScadaLink.Communication/DebugStreamService.cs index 73cf42c..b17197e 100644 --- a/src/ScadaLink.Communication/DebugStreamService.cs +++ b/src/ScadaLink.Communication/DebugStreamService.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Communication.Actors; +using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication; @@ -17,6 +18,7 @@ public class DebugStreamService { private readonly CommunicationService _communicationService; private readonly IServiceProvider _serviceProvider; + private readonly SiteStreamGrpcClientFactory _grpcClientFactory; private readonly ILogger _logger; private readonly ConcurrentDictionary _sessions = new(); private ActorSystem? _actorSystem; @@ -24,10 +26,12 @@ public class DebugStreamService public DebugStreamService( CommunicationService communicationService, IServiceProvider serviceProvider, + SiteStreamGrpcClientFactory grpcClientFactory, ILogger logger) { _communicationService = communicationService; _serviceProvider = serviceProvider; + _grpcClientFactory = grpcClientFactory; _logger = logger; } @@ -56,6 +60,8 @@ public class DebugStreamService // Resolve instance → unique name + site string instanceUniqueName; string siteIdentifier; + string grpcNodeAAddress; + string grpcNodeBAddress; using (var scope = _serviceProvider.CreateScope()) { @@ -69,6 +75,10 @@ public class DebugStreamService instanceUniqueName = instance.UniqueName; siteIdentifier = site.SiteIdentifier; + grpcNodeAAddress = site.GrpcNodeAAddress + ?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured."); + grpcNodeBAddress = site.GrpcNodeBAddress + ?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured."); } var sessionId = Guid.NewGuid().ToString("N"); @@ -104,7 +114,10 @@ public class DebugStreamService sessionId, commActor, onEventWrapper, - onTerminatedWrapper); + onTerminatedWrapper, + _grpcClientFactory, + grpcNodeAAddress, + grpcNodeBAddress); var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}"); diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs index bfc62b6..f7c8f92 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClient.cs @@ -36,9 +36,10 @@ public class SiteStreamGrpcClient : IAsyncDisposable } /// - /// Private constructor for unit testing without a real gRPC channel. + /// Protected constructor for unit testing without a real gRPC channel. + /// Allows subclassing for mock implementations. /// - private SiteStreamGrpcClient() + protected SiteStreamGrpcClient() { } @@ -62,7 +63,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable /// The callback delivers domain events, and /// lets the caller handle reconnection. /// - public async Task SubscribeAsync( + public virtual async Task SubscribeAsync( string correlationId, string instanceUniqueName, Action onEvent, @@ -109,7 +110,7 @@ public class SiteStreamGrpcClient : IAsyncDisposable /// /// Cancels an active subscription by correlation ID. /// - public void Unsubscribe(string correlationId) + public virtual void Unsubscribe(string correlationId) { if (_subscriptions.TryRemove(correlationId, out var cts)) { diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs index 406e8c2..261e9d2 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcClientFactory.cs @@ -21,7 +21,7 @@ public class SiteStreamGrpcClientFactory : IAsyncDisposable, IDisposable /// /// Returns an existing client for the site or creates a new one. /// - public SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) + public virtual SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) { return _clients.GetOrAdd(siteIdentifier, _ => { diff --git a/src/ScadaLink.Communication/ServiceCollectionExtensions.cs b/src/ScadaLink.Communication/ServiceCollectionExtensions.cs index 688ddef..330a9e0 100644 --- a/src/ScadaLink.Communication/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.Communication/ServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.DependencyInjection; +using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication; @@ -10,6 +11,7 @@ public static class ServiceCollectionExtensions .BindConfiguration("Communication"); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); return services; diff --git a/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs new file mode 100644 index 0000000..c713c82 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs @@ -0,0 +1,339 @@ +using Akka.Actor; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using ScadaLink.Commons.Messages.DebugView; +using ScadaLink.Commons.Messages.Streaming; +using ScadaLink.Communication.Actors; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests.Grpc; + +/// +/// Tests for DebugStreamBridgeActor with gRPC streaming integration. +/// +public class DebugStreamBridgeActorTests : TestKit +{ + private const string SiteId = "site-alpha"; + private const string InstanceName = "Site1.Pump01"; + private const string GrpcNodeA = "http://localhost:5100"; + private const string GrpcNodeB = "http://localhost:5200"; + + public DebugStreamBridgeActorTests() : base(@"akka.loglevel = DEBUG") + { + // Use a very short reconnect delay for testing + DebugStreamBridgeActor.ReconnectDelay = TimeSpan.FromMilliseconds(100); + } + + private record TestContext( + IActorRef BridgeActor, + TestProbe CommProbe, + MockSiteStreamGrpcClient MockGrpcClient, + List ReceivedEvents, + bool[] TerminatedFlag); + + private TestContext CreateBridgeActor() + { + var commProbe = CreateTestProbe(); + var mockClient = new MockSiteStreamGrpcClient(); + var factory = new MockSiteStreamGrpcClientFactory(mockClient); + var events = new List(); + var terminated = new[] { false }; + + Action onEvent = evt => { lock (events) { events.Add(evt); } }; + Action onTerminated = () => terminated[0] = true; + + var props = Props.Create(typeof(DebugStreamBridgeActor), + SiteId, + InstanceName, + "corr-1", + commProbe.Ref, + onEvent, + onTerminated, + factory, + GrpcNodeA, + GrpcNodeB); + + var actor = Sys.ActorOf(props); + return new TestContext(actor, commProbe, mockClient, events, terminated); + } + + [Fact] + public void PreStart_Sends_SubscribeDebugViewRequest_Via_ClusterClient() + { + var ctx = CreateBridgeActor(); + + var envelope = ctx.CommProbe.ExpectMsg(); + Assert.Equal(SiteId, envelope.SiteId); + Assert.IsType(envelope.Message); + + var req = (SubscribeDebugViewRequest)envelope.Message; + Assert.Equal(InstanceName, req.InstanceUniqueName); + Assert.Equal("corr-1", req.CorrelationId); + } + + [Fact] + public void On_Snapshot_Forwards_To_OnEvent_Callback() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + ctx.BridgeActor.Tell(snapshot); + + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[0]); } + } + + [Fact] + public void On_Snapshot_Opens_GrpcStream() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + ctx.BridgeActor.Tell(snapshot); + + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var call = ctx.MockGrpcClient.SubscribeCalls[0]; + Assert.Equal("corr-1", call.CorrelationId); + Assert.Equal(InstanceName, call.InstanceUniqueName); + } + + [Fact] + public void Events_From_GrpcCallback_Forwarded_To_OnEvent() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // Simulate gRPC event arriving via the onEvent callback + var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(attrChange); + + // snapshot + attr change + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) { Assert.IsType(ctx.ReceivedEvents[1]); } + } + + [Fact] + public void On_GrpcError_Reconnects_To_Other_Node() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // Simulate gRPC error + ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Stream broken")); + + // Should resubscribe + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); + Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[1].CorrelationId); + } + + [Fact] + public void After_MaxRetries_Terminates() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + Watch(ctx.BridgeActor); + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // 4 consecutive errors: initial + 3 retries, then terminate + ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); + + ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); + + ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); + + // Fourth error exceeds max retries + ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); + + ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(5)); + Assert.True(ctx.TerminatedFlag[0]); + } + + [Fact] + public void StopDebugStream_Cancels_Grpc_And_Sends_Unsubscribe() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); // subscribe + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + Watch(ctx.BridgeActor); + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + ctx.BridgeActor.Tell(new StopDebugStream()); + + // Should send ClusterClient unsubscribe + var envelope = ctx.CommProbe.ExpectMsg(); + Assert.IsType(envelope.Message); + + // Should unsubscribe gRPC + AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Count > 0, TimeSpan.FromSeconds(3)); + Assert.Contains("corr-1", ctx.MockGrpcClient.UnsubscribedCorrelationIds); + + // Should stop self + ExpectTerminated(ctx.BridgeActor); + } + + [Fact] + public void DebugStreamTerminated_Stops_Actor_Idempotently() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + Watch(ctx.BridgeActor); + ctx.BridgeActor.Tell(new DebugStreamTerminated(SiteId, "corr-1")); + + ExpectTerminated(ctx.BridgeActor); + Assert.True(ctx.TerminatedFlag[0]); + } + + [Fact] + public void Grpc_Error_Resets_RetryCount_On_Successful_Event() + { + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // First error → retry 1 + ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("Error 1")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); + + // Simulate successful event (resets retry count) + var attrChange = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(attrChange); + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + + // Now another 3 errors should be tolerated (retry count was reset) + ctx.MockGrpcClient.SubscribeCalls[1].OnError(new Exception("Error 2")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 3, TimeSpan.FromSeconds(5)); + + ctx.MockGrpcClient.SubscribeCalls[2].OnError(new Exception("Error 3")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 4, TimeSpan.FromSeconds(5)); + + ctx.MockGrpcClient.SubscribeCalls[3].OnError(new Exception("Error 4")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 5, TimeSpan.FromSeconds(5)); + + // Still alive — 3 retries from the second failure point succeeded + } +} + +/// +/// Mock gRPC client that records SubscribeAsync and Unsubscribe calls. +/// +internal class MockSiteStreamGrpcClient : SiteStreamGrpcClient +{ + public List SubscribeCalls { get; } = new(); + public List UnsubscribedCorrelationIds { get; } = new(); + + private MockSiteStreamGrpcClient(bool _) : base() { } + + public MockSiteStreamGrpcClient() : base() + { + } + + public override Task SubscribeAsync( + string correlationId, + string instanceUniqueName, + Action onEvent, + Action onError, + CancellationToken ct) + { + var subscription = new MockSubscription(correlationId, instanceUniqueName, onEvent, onError, ct); + SubscribeCalls.Add(subscription); + + // Return a task that completes when cancelled (simulates long-running stream) + var tcs = new TaskCompletionSource(); + ct.Register(() => tcs.TrySetResult()); + return tcs.Task; + } + + public override void Unsubscribe(string correlationId) + { + UnsubscribedCorrelationIds.Add(correlationId); + } +} + +internal record MockSubscription( + string CorrelationId, + string InstanceUniqueName, + Action OnEvent, + Action OnError, + CancellationToken CancellationToken); + +/// +/// Factory that always returns the pre-configured mock client. +/// +internal class MockSiteStreamGrpcClientFactory : SiteStreamGrpcClientFactory +{ + private readonly MockSiteStreamGrpcClient _mockClient; + public List RequestedEndpoints { get; } = new(); + + public MockSiteStreamGrpcClientFactory(MockSiteStreamGrpcClient mockClient) + : base(Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory.Instance) + { + _mockClient = mockClient; + } + + public override SiteStreamGrpcClient GetOrCreate(string siteIdentifier, string grpcEndpoint) + { + RequestedEndpoints.Add(grpcEndpoint); + return _mockClient; + } +}